diff --git a/admin/commands/storage/backfill_tx_error_messages_test.go b/admin/commands/storage/backfill_tx_error_messages_test.go index 4fa11fecb1e..ce7a11211b2 100644 --- a/admin/commands/storage/backfill_tx_error_messages_test.go +++ b/admin/commands/storage/backfill_tx_error_messages_test.go @@ -77,6 +77,7 @@ func TestBackfillTxErrorMessages(t *testing.T) { func (suite *BackfillTxErrorMessagesSuite) SetupTest() { suite.log = zerolog.New(os.Stderr) + lockManager := storage.NewTestingLockManager() suite.state = new(protocolmock.State) suite.headers = new(storagemock.Headers) suite.receipts = new(storagemock.ExecutionReceipts) @@ -160,6 +161,7 @@ func (suite *BackfillTxErrorMessagesSuite) SetupTest() { errorMessageProvider, suite.txErrorMessages, executionNodeIdentitiesProvider, + lockManager, ) suite.command = NewBackfillTxErrorMessagesCommand( @@ -532,7 +534,7 @@ func (suite *BackfillTxErrorMessagesSuite) mockStoreTxErrorMessages( } } - suite.txErrorMessages.On("Store", blockID, txErrorMessages).Return(nil).Once() + suite.txErrorMessages.On("Store", mock.Anything, blockID, txErrorMessages).Return(nil).Once() } // assertAllExpectations asserts that all the expectations set on various mocks are met, diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index e06fd2ab29c..5c2dce02ebb 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -2217,6 +2217,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { notNil(builder.txResultErrorMessageProvider), builder.transactionResultErrorMessages, notNil(builder.ExecNodeIdentitiesProvider), + node.StorageLockMgr, ) } diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_core.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_core.go index 1ce681e051c..b08a9ae907f 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_core.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_core.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/jordanschalm/lockctx" "github.com/rs/zerolog" "github.com/onflow/flow-go/engine/access/rpc/backend/transactions/error_messages" @@ -24,6 +25,7 @@ type TxErrorMessagesCore struct { txErrorMessageProvider error_messages.Provider transactionResultErrorMessages storage.TransactionResultErrorMessages execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider + lockManager storage.LockManager } // NewTxErrorMessagesCore creates a new instance of TxErrorMessagesCore. @@ -32,12 +34,14 @@ func NewTxErrorMessagesCore( txErrorMessageProvider error_messages.Provider, transactionResultErrorMessages storage.TransactionResultErrorMessages, execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider, + lockManager storage.LockManager, ) *TxErrorMessagesCore { return &TxErrorMessagesCore{ log: log.With().Str("module", "tx_error_messages_core").Logger(), txErrorMessageProvider: txErrorMessageProvider, transactionResultErrorMessages: transactionResultErrorMessages, execNodeIdentitiesProvider: execNodeIdentitiesProvider, + lockManager: lockManager, } } @@ -62,6 +66,15 @@ func (c *TxErrorMessagesCore) FetchErrorMessages(ctx context.Context, blockID fl return c.FetchErrorMessagesByENs(ctx, blockID, execNodes) } +// FetchErrorMessagesByENs requests the transaction result error messages for the specified block ID from +// any of the given execution nodes and persists them once retrieved. This function blocks until ingesting +// the tx error messages is completed or failed. +// +// Note that transaction error messages are auxiliary data provided by the Execution Nodes on a goodwill basis and +// not protected by the protocol. Execution Error messages might be non-deterministic, i.e. potentially different +// for different execution nodes. Hence, we also persist which execution node (`execNode) provided the error message. +// +// It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist. func (c *TxErrorMessagesCore) FetchErrorMessagesByENs( ctx context.Context, blockID flow.Identifier, @@ -71,7 +84,6 @@ func (c *TxErrorMessagesCore) FetchErrorMessagesByENs( if err != nil { return fmt.Errorf("could not check existance of transaction result error messages: %w", err) } - if exists { return nil } @@ -100,14 +112,10 @@ func (c *TxErrorMessagesCore) FetchErrorMessagesByENs( return nil } -// storeTransactionResultErrorMessages stores the transaction result error messages for a given block ID. +// storeTransactionResultErrorMessages persists and indexes all transaction result error messages for the given blockID. +// The caller must acquire [storage.LockInsertTransactionResultErrMessage] and hold it until the write batch has been committed. // -// Parameters: -// - blockID: The identifier of the block for which the error messages are to be stored. -// - errorMessagesResponses: A slice of responses containing the error messages to be stored. -// - execNode: The execution node associated with the error messages. -// -// No errors are expected during normal operation. +// It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist. func (c *TxErrorMessagesCore) storeTransactionResultErrorMessages( blockID flow.Identifier, errorMessagesResponses []*execproto.GetTransactionErrorMessagesResponse_Result, @@ -124,7 +132,9 @@ func (c *TxErrorMessagesCore) storeTransactionResultErrorMessages( errorMessages = append(errorMessages, errorMessage) } - err := c.transactionResultErrorMessages.Store(blockID, errorMessages) + err := storage.WithLock(c.lockManager, storage.LockInsertTransactionResultErrMessage, func(lctx lockctx.Context) error { + return c.transactionResultErrorMessages.Store(lctx, blockID, errorMessages) + }) if err != nil { return fmt.Errorf("failed to store transaction error messages: %w", err) } diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go index 04e0e8ac426..431aa215a69 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "github.com/jordanschalm/lockctx" execproto "github.com/onflow/flow/protobuf/go/flow/execution" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" @@ -21,7 +22,8 @@ import ( "github.com/onflow/flow-go/module/irrecoverable" syncmock "github.com/onflow/flow-go/module/state_synchronization/mock" protocol "github.com/onflow/flow-go/state/protocol/mock" - storage "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/storage" + storagemock "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/utils/unittest" ) @@ -37,13 +39,14 @@ type TxErrorMessagesCoreSuite struct { params *protocol.Params } - receipts *storage.ExecutionReceipts - txErrorMessages *storage.TransactionResultErrorMessages - lightTxResults *storage.LightTransactionResults + receipts *storagemock.ExecutionReceipts + txErrorMessages *storagemock.TransactionResultErrorMessages + lightTxResults *storagemock.LightTransactionResults reporter *syncmock.IndexReporter indexReporter *index.Reporter txResultsIndex *index.TransactionResultsIndex + lockManager storage.LockManager enNodeIDs flow.IdentityList execClient *accessmock.ExecutionAPIClient @@ -79,18 +82,21 @@ func (s *TxErrorMessagesCoreSuite) SetupTest() { s.proto.params = protocol.NewParams(s.T()) s.execClient = accessmock.NewExecutionAPIClient(s.T()) s.connFactory = connectionmock.NewConnectionFactory(s.T()) - s.receipts = storage.NewExecutionReceipts(s.T()) - s.txErrorMessages = storage.NewTransactionResultErrorMessages(s.T()) + s.receipts = storagemock.NewExecutionReceipts(s.T()) + s.txErrorMessages = storagemock.NewTransactionResultErrorMessages(s.T()) s.rootBlock = unittest.Block.Genesis(flow.Emulator) s.finalizedBlock = unittest.BlockWithParentFixture(s.rootBlock.ToHeader()).ToHeader() - s.lightTxResults = storage.NewLightTransactionResults(s.T()) + s.lightTxResults = storagemock.NewLightTransactionResults(s.T()) s.reporter = syncmock.NewIndexReporter(s.T()) s.indexReporter = index.NewReporter() err := s.indexReporter.Initialize(s.reporter) s.Require().NoError(err) s.txResultsIndex = index.NewTransactionResultsIndex(s.indexReporter, s.lightTxResults) + // Initialize lock manager for tests + s.lockManager = storage.NewTestingLockManager() + s.proto.state.On("Params").Return(s.proto.params) // Mock the finalized root block header with height 0. @@ -143,8 +149,11 @@ func (s *TxErrorMessagesCoreSuite) TestHandleTransactionResultErrorMessages() { expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0]) // Mock the storage of the fetched error messages into the protocol database. - s.txErrorMessages.On("Store", blockId, expectedStoreTxErrorMessages). - Return(nil).Once() + s.txErrorMessages.On("Store", mock.Anything, blockId, expectedStoreTxErrorMessages). + Return(func(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error { + require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage)) + return nil + }).Once() core := s.initCore() err := core.FetchErrorMessages(irrecoverableCtx, blockId) @@ -228,8 +237,11 @@ func (s *TxErrorMessagesCoreSuite) TestHandleTransactionResultErrorMessages_Erro // Simulate an error when attempting to store the fetched transaction error messages in storage. expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0]) - s.txErrorMessages.On("Store", blockId, expectedStoreTxErrorMessages). - Return(fmt.Errorf("storage error")).Once() + s.txErrorMessages.On("Store", mock.Anything, blockId, expectedStoreTxErrorMessages). + Return(func(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error { + require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage)) + return fmt.Errorf("storage error") + }).Once() core := s.initCore() err := core.FetchErrorMessages(irrecoverableCtx, blockId) @@ -268,6 +280,7 @@ func (s *TxErrorMessagesCoreSuite) initCore() *TxErrorMessagesCore { errorMessageProvider, s.txErrorMessages, execNodeIdentitiesProvider, + s.lockManager, ) return core } @@ -311,7 +324,7 @@ func mockTransactionResultsByBlock(count int) []flow.LightTransactionResult { // setupReceiptsForBlock sets up mock execution receipts for a block and returns the receipts along // with the identities of the execution nodes that processed them. -func setupReceiptsForBlock(receipts *storage.ExecutionReceipts, block *flow.Block, eNodeID flow.Identifier) { +func setupReceiptsForBlock(receipts *storagemock.ExecutionReceipts, block *flow.Block, eNodeID flow.Identifier) { receipt1 := unittest.ReceiptForBlockFixture(block) receipt1.ExecutorID = eNodeID receipt2 := unittest.ReceiptForBlockFixture(block) @@ -328,7 +341,7 @@ func setupReceiptsForBlock(receipts *storage.ExecutionReceipts, block *flow.Bloc } // setupReceiptsForBlockWithResult sets up mock execution receipts for a block with a specific execution result -func setupReceiptsForBlockWithResult(receipts *storage.ExecutionReceipts, executionResult *flow.ExecutionResult, executorIDs ...flow.Identifier) { +func setupReceiptsForBlockWithResult(receipts *storagemock.ExecutionReceipts, executionResult *flow.ExecutionResult, executorIDs ...flow.Identifier) { receiptList := make(flow.ExecutionReceiptList, 0, len(executorIDs)) for _, enID := range executorIDs { receiptList = append(receiptList, unittest.ExecutionReceiptFixture( diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go index 7acc1f4ad01..d4bb86ad984 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/jordanschalm/lockctx" execproto "github.com/onflow/flow/protobuf/go/flow/execution" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" @@ -55,6 +56,7 @@ type TxErrorMessagesEngineSuite struct { reporter *syncmock.IndexReporter indexReporter *index.Reporter txResultsIndex *index.TransactionResultsIndex + lockManager storage.LockManager enNodeIDs flow.IdentityList execClient *accessmock.ExecutionAPIClient @@ -87,9 +89,13 @@ func (s *TxErrorMessagesEngineSuite) SetupTest() { s.log = unittest.Logger() s.metrics = metrics.NewNoopCollector() s.ctx, s.cancel = context.WithCancel(context.Background()) + + // Initialize database and lock manager pdb, dbDir := unittest.TempPebbleDB(s.T()) s.db = pebbleimpl.ToDB(pdb) s.dbDir = dbDir + s.lockManager = storage.NewTestingLockManager() + // mock out protocol state s.proto.state = protocol.NewFollowerState(s.T()) s.proto.snapshot = protocol.NewSnapshot(s.T()) @@ -177,6 +183,7 @@ func (s *TxErrorMessagesEngineSuite) initEngine(ctx irrecoverable.SignalerContex errorMessageProvider, s.txErrorMessages, execNodeIdentitiesProvider, + s.lockManager, ) eng, err := New( @@ -245,10 +252,12 @@ func (s *TxErrorMessagesEngineSuite) TestOnFinalizedBlockHandleTxErrorMessages() expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0]) // Mock the storage of the fetched error messages into the protocol database. - s.txErrorMessages.On("Store", blockID, expectedStoreTxErrorMessages).Return(nil). + s.txErrorMessages.On("Store", mock.Anything, blockID, expectedStoreTxErrorMessages).Return(nil). Run(func(args mock.Arguments) { - // Ensure the test does not complete its work faster than necessary - wg.Done() + lctx, ok := args[0].(lockctx.Proof) + require.True(s.T(), ok, "expecting lock proof, but cast failed") + require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage)) + wg.Done() // Ensure the test does not complete its work faster than necessary }).Once() } diff --git a/module/executiondatasync/optimistic_sync/core.go b/module/executiondatasync/optimistic_sync/core.go index c222b28a8ac..044485a7ad5 100644 --- a/module/executiondatasync/optimistic_sync/core.go +++ b/module/executiondatasync/optimistic_sync/core.go @@ -335,7 +335,7 @@ func (c *CoreImpl) Persist() error { stores.NewEventsStore(indexerData.Events, c.workingData.persistentEvents, c.executionResult.BlockID), stores.NewResultsStore(indexerData.Results, c.workingData.persistentResults, c.executionResult.BlockID), stores.NewCollectionsStore(indexerData.Collections, c.workingData.persistentCollections), - stores.NewTxResultErrMsgStore(c.workingData.txResultErrMsgsData, c.workingData.persistentTxResultErrMsgs, c.executionResult.BlockID), + stores.NewTxResultErrMsgStore(c.workingData.txResultErrMsgsData, c.workingData.persistentTxResultErrMsgs, c.executionResult.BlockID, c.workingData.lockManager), stores.NewLatestSealedResultStore(c.workingData.latestPersistedSealedResult, c.executionResult.ID(), c.block.Height), } blockPersister := persisters.NewBlockPersister( diff --git a/module/executiondatasync/optimistic_sync/core_impl_test.go b/module/executiondatasync/optimistic_sync/core_impl_test.go index 19b87866603..a078d885fa6 100644 --- a/module/executiondatasync/optimistic_sync/core_impl_test.go +++ b/module/executiondatasync/optimistic_sync/core_impl_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/jordanschalm/lockctx" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -446,9 +447,21 @@ func (c *CoreImplSuite) TestCoreImpl_Persist() { indexerData := core.workingData.indexerData c.persistentRegisters.On("Store", flow.RegisterEntries(indexerData.Registers), tf.block.Height).Return(nil) c.persistentEvents.On("BatchStore", blockID, []flow.EventsList{indexerData.Events}, mock.Anything).Return(nil) - c.persistentCollections.On("BatchStoreAndIndexByTransaction", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) - c.persistentResults.On("BatchStore", blockID, indexerData.Results, mock.Anything).Return(nil) - c.persistentTxResultErrMsg.On("BatchStore", blockID, core.workingData.txResultErrMsgsData, mock.Anything).Return(nil) + c.persistentCollections.On("BatchStoreAndIndexByTransaction", + mock.MatchedBy(func(lctx lockctx.Proof) bool { + return lctx.HoldsLock(storage.LockInsertCollection) + }), + mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + c.persistentResults.On("BatchStore", + mock.MatchedBy(func(lctx lockctx.Proof) bool { + return lctx.HoldsLock(storage.LockInsertLightTransactionResult) + }), + mock.Anything, blockID, indexerData.Results).Return(nil) + c.persistentTxResultErrMsg.On("BatchStore", + mock.MatchedBy(func(lctx lockctx.Proof) bool { + return lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage) + }), + mock.Anything, blockID, core.workingData.txResultErrMsgsData).Return(nil) c.latestPersistedSealedResult.On("BatchSet", tf.exeResult.ID(), tf.block.Height, mock.Anything).Return(nil) err = core.Persist() diff --git a/module/executiondatasync/optimistic_sync/persisters/block.go b/module/executiondatasync/optimistic_sync/persisters/block.go index 371ed98b149..76262611a1f 100644 --- a/module/executiondatasync/optimistic_sync/persisters/block.go +++ b/module/executiondatasync/optimistic_sync/persisters/block.go @@ -60,20 +60,19 @@ func (p *BlockPersister) Persist() error { p.log.Debug().Msg("started to persist execution data") start := time.Now() - lctx := p.lockManager.NewContext() - err := lctx.AcquireLock(storage.LockInsertCollection) - if err != nil { - return fmt.Errorf("could not acquire lock for inserting light collections: %w", err) - } - defer lctx.Release() - - err = p.protocolDB.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error { - for _, persister := range p.persisterStores { - if err := persister.Persist(lctx, batch); err != nil { - return err + err := storage.WithLocks(p.lockManager, []string{ + storage.LockInsertCollection, + storage.LockInsertLightTransactionResult, + storage.LockInsertTransactionResultErrMessage, + }, func(lctx lockctx.Context) error { + return p.protocolDB.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error { + for _, persister := range p.persisterStores { + if err := persister.Persist(lctx, batch); err != nil { + return err + } } - } - return nil + return nil + }) }) if err != nil { diff --git a/module/executiondatasync/optimistic_sync/persisters/block_test.go b/module/executiondatasync/optimistic_sync/persisters/block_test.go index c09b7bbb13c..d7bb6641f7a 100644 --- a/module/executiondatasync/optimistic_sync/persisters/block_test.go +++ b/module/executiondatasync/optimistic_sync/persisters/block_test.go @@ -111,7 +111,7 @@ func (p *PersisterSuite) testWithDatabase() { stores.NewEventsStore(p.indexerData.Events, events, p.executionResult.BlockID), stores.NewResultsStore(p.indexerData.Results, results, p.executionResult.BlockID), stores.NewCollectionsStore(p.indexerData.Collections, collections), - stores.NewTxResultErrMsgStore(p.txErrMsgs, txResultErrMsg, p.executionResult.BlockID), + stores.NewTxResultErrMsgStore(p.txErrMsgs, txResultErrMsg, p.executionResult.BlockID, lockManager), stores.NewLatestSealedResultStore(latestPersistedSealedResult, p.executionResult.ID(), p.header.Height), }, ) diff --git a/module/executiondatasync/optimistic_sync/persisters/stores/events.go b/module/executiondatasync/optimistic_sync/persisters/stores/events.go index cb7224c9096..840680c2113 100644 --- a/module/executiondatasync/optimistic_sync/persisters/stores/events.go +++ b/module/executiondatasync/optimistic_sync/persisters/stores/events.go @@ -1,6 +1,7 @@ package stores import ( + "errors" "fmt" "github.com/jordanschalm/lockctx" @@ -34,8 +35,16 @@ func NewEventsStore( // // No error returns are expected during normal operations func (e *EventsStore) Persist(_ lockctx.Proof, batch storage.ReaderBatchWriter) error { - if err := e.persistedEvents.BatchStore(e.blockID, []flow.EventsList{e.data}, batch); err != nil { + err := e.persistedEvents.BatchStore(e.blockID, []flow.EventsList{e.data}, batch) + if err != nil { + if errors.Is(err, storage.ErrAlreadyExists) { + // CAUTION: here we assume that if something is already stored for our blockID, then the data is identical. + // This only holds true for sealed execution results, whose consistency has previously been verified by + // comparing the data's hash to commitments in the execution result. + return nil + } return fmt.Errorf("could not add events to batch: %w", err) } + return nil } diff --git a/module/executiondatasync/optimistic_sync/persisters/stores/results.go b/module/executiondatasync/optimistic_sync/persisters/stores/results.go index 7cb9770c1a9..dca1fe94d30 100644 --- a/module/executiondatasync/optimistic_sync/persisters/stores/results.go +++ b/module/executiondatasync/optimistic_sync/persisters/stores/results.go @@ -1,6 +1,7 @@ package stores import ( + "errors" "fmt" "github.com/jordanschalm/lockctx" @@ -30,11 +31,19 @@ func NewResultsStore( } } -// Persist adds results to the batch. -// +// Persist saves and indexes all transaction results (light representation) for our block as part of the +// provided database batch. The caller must acquire [storage.LockInsertLightTransactionResult] and hold +// it until the write batch has been committed. // No error returns are expected during normal operations -func (r *ResultsStore) Persist(_ lockctx.Proof, batch storage.ReaderBatchWriter) error { - if err := r.persistedResults.BatchStore(r.blockID, r.data, batch); err != nil { +func (r *ResultsStore) Persist(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + err := r.persistedResults.BatchStore(lctx, rw, r.blockID, r.data) + if err != nil { + // CAUTION: here we assume that if something is already stored for our blockID, then the data is identical. + // This only holds true for sealed execution results, whose consistency has previously been verified by + // comparing the data's hash to commitments in the execution result. + if errors.Is(err, storage.ErrAlreadyExists) { + return nil + } return fmt.Errorf("could not add transaction results to batch: %w", err) } return nil diff --git a/module/executiondatasync/optimistic_sync/persisters/stores/transaction_result_error_messages.go b/module/executiondatasync/optimistic_sync/persisters/stores/transaction_result_error_messages.go index 5976d151e37..b6555ef3bba 100644 --- a/module/executiondatasync/optimistic_sync/persisters/stores/transaction_result_error_messages.go +++ b/module/executiondatasync/optimistic_sync/persisters/stores/transaction_result_error_messages.go @@ -1,6 +1,7 @@ package stores import ( + "errors" "fmt" "github.com/jordanschalm/lockctx" @@ -16,25 +17,36 @@ type TxResultErrMsgStore struct { data []flow.TransactionResultErrorMessage persistedTxResultErrMsg storage.TransactionResultErrorMessages blockID flow.Identifier + lockManager storage.LockManager } func NewTxResultErrMsgStore( data []flow.TransactionResultErrorMessage, persistedTxResultErrMsg storage.TransactionResultErrorMessages, blockID flow.Identifier, + lockManager storage.LockManager, ) *TxResultErrMsgStore { return &TxResultErrMsgStore{ data: data, persistedTxResultErrMsg: persistedTxResultErrMsg, blockID: blockID, + lockManager: lockManager, } } -// Persist adds transaction result error messages to the batch. -// +// Persist saves and indexes all transaction result error messages for our block as part of the +// provided database batch. The caller must acquire [storage.LockInsertTransactionResultErrMessage] +// and hold it until the write batch has been committed. // No error returns are expected during normal operations -func (t *TxResultErrMsgStore) Persist(_ lockctx.Proof, batch storage.ReaderBatchWriter) error { - if err := t.persistedTxResultErrMsg.BatchStore(t.blockID, t.data, batch); err != nil { +func (t *TxResultErrMsgStore) Persist(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + err := t.persistedTxResultErrMsg.BatchStore(lctx, rw, t.blockID, t.data) + if err != nil { + // CAUTION: here we assume that if something is already stored for our blockID, then the data is identical. + // This only holds true for sealed execution results, whose consistency has previously been verified by + // comparing the data's hash to commitments in the execution result. + if errors.Is(err, storage.ErrAlreadyExists) { + return nil + } return fmt.Errorf("could not add transaction result error messages to batch: %w", err) } return nil diff --git a/module/state_synchronization/indexer/indexer_core.go b/module/state_synchronization/indexer/indexer_core.go index 120c5b02dcb..60dc2d5bdf3 100644 --- a/module/state_synchronization/indexer/indexer_core.go +++ b/module/state_synchronization/indexer/indexer_core.go @@ -119,9 +119,8 @@ func (c *IndexerCore) RegisterValue(ID flow.RegisterID, height uint64) (flow.Reg // IndexBlockData indexes all execution block data by height. // This method shouldn't be used concurrently. -// -// Expected error returns during normal operation: -// - [storage.ErrNotFound]: if the block for execution data was not found +// Expected error returns during normal operations: +// - [storage.ErrNotFound] if the block for execution data was not found func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEntity) error { header, err := c.headers.ByBlockID(data.BlockID) if err != nil { @@ -171,32 +170,33 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti return fmt.Errorf("could not collect scheduled transaction data: %w", err) } - lctx := c.lockManager.NewContext() - defer lctx.Release() - if err = lctx.AcquireLock(storage.LockIndexScheduledTransaction); err != nil { - return fmt.Errorf("could not acquire lock for indexing scheduled transactions: %w", err) - } - - err = c.protocolDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { - err := c.events.BatchStore(data.BlockID, []flow.EventsList{events}, rw) - if err != nil { - return fmt.Errorf("could not index events at height %d: %w", header.Height, err) - } - - err = c.results.BatchStore(data.BlockID, results, rw) - if err != nil { - return fmt.Errorf("could not index transaction results at height %d: %w", header.Height, err) - } - - for txID, scheduledTxID := range scheduledTransactionData { - err = c.scheduledTransactions.BatchIndex(lctx, data.BlockID, txID, scheduledTxID, rw) - if err != nil { - return fmt.Errorf("could not index scheduled transaction (%d) %s at height %d: %w", scheduledTxID, txID, header.Height, err) - } - } - - return nil - }) + err = storage.WithLocks(c.lockManager, []string{ + storage.LockInsertLightTransactionResult, + storage.LockIndexScheduledTransaction, + }, + func(lctx lockctx.Context) error { + return c.protocolDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + err := c.events.BatchStore(data.BlockID, []flow.EventsList{events}, rw) + if err != nil { + return fmt.Errorf("could not index events at height %d: %w", header.Height, err) + } + + // requires the [storage.LockInsertLightTransactionResult] lock + err = c.results.BatchStore(lctx, rw, data.BlockID, results) + if err != nil { + return fmt.Errorf("could not index transaction results at height %d: %w", header.Height, err) + } + + for txID, scheduledTxID := range scheduledTransactionData { + err = c.scheduledTransactions.BatchIndex(lctx, data.BlockID, txID, scheduledTxID, rw) + if err != nil { + return fmt.Errorf("could not index scheduled transaction (%d) %s at height %d: %w", scheduledTxID, txID, header.Height, err) + } + } + + return nil + }) + }) if err != nil { return fmt.Errorf("could not commit block data: %w", err) diff --git a/module/state_synchronization/indexer/indexer_core_test.go b/module/state_synchronization/indexer/indexer_core_test.go index 0deba63b0fc..24ae7424c48 100644 --- a/module/state_synchronization/indexer/indexer_core_test.go +++ b/module/state_synchronization/indexer/indexer_core_test.go @@ -6,6 +6,7 @@ import ( "os" "testing" + "github.com/jordanschalm/lockctx" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -133,6 +134,27 @@ func (i *indexCoreTest) setStoreRegisters(f func(t *testing.T, entries flow.Regi return i } +func (i *indexCoreTest) setStoreEvents(f func(*testing.T, flow.Identifier, []flow.EventsList) error) *indexCoreTest { + i.events. + On("BatchStore", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("[]flow.EventsList"), mock.Anything). + Return(func(blockID flow.Identifier, events []flow.EventsList, batch storage.ReaderBatchWriter) error { + require.NotNil(i.t, batch) + return f(i.t, blockID, events) + }) + return i +} + +func (i *indexCoreTest) setStoreTransactionResults(f func(*testing.T, flow.Identifier, []flow.LightTransactionResult) error) *indexCoreTest { + i.results. + On("BatchStore", mock.Anything, mock.Anything, mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("[]flow.LightTransactionResult")). + Return(func(lctx lockctx.Proof, batch storage.ReaderBatchWriter, blockID flow.Identifier, results []flow.LightTransactionResult) error { + require.True(i.t, lctx.HoldsLock(storage.LockInsertLightTransactionResult)) + require.NotNil(i.t, batch) + return f(i.t, blockID, results) + }) + return i +} + func (i *indexCoreTest) setGetRegisters(f func(t *testing.T, ID flow.RegisterID, height uint64) (flow.RegisterValue, error)) *indexCoreTest { i.registers. On("Get", mock.AnythingOfType("flow.RegisterID"), mock.AnythingOfType("uint64")). @@ -151,8 +173,12 @@ func (i *indexCoreTest) useDefaultEvents() *indexCoreTest { func (i *indexCoreTest) useDefaultTransactionResults() *indexCoreTest { i.results. - On("BatchStore", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("[]flow.LightTransactionResult"), mock.Anything). - Return(nil) + On("BatchStore", mock.Anything, mock.Anything, mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("[]flow.LightTransactionResult")). + Return(func(lctx lockctx.Proof, batch storage.ReaderBatchWriter, _ flow.Identifier, _ []flow.LightTransactionResult) error { + require.True(i.t, lctx.HoldsLock(storage.LockInsertLightTransactionResult)) + require.NotNil(i.t, batch) + return nil + }) return i } @@ -231,8 +257,18 @@ func TestExecutionState_IndexBlockData(t *testing.T) { t.Run("Index AllTheThings", func(t *testing.T) { test := newIndexCoreTest(t, g, blocks, tf.ExecutionDataEntity()).initIndexer() - test.events.On("BatchStore", blockID, []flow.EventsList{tf.ExpectedEvents}, mock.Anything).Return(nil) - test.results.On("BatchStore", blockID, tf.ExpectedResults, mock.Anything).Return(nil) + test.events.On("BatchStore", mock.Anything, []flow.EventsList{tf.ExpectedEvents}, mock.Anything). + Return(func(blockID flow.Identifier, events []flow.EventsList, batch storage.ReaderBatchWriter) error { + require.NotNil(t, batch) + // Events BatchStore doesn't require specific locks, but we validate the batch is provided + return nil + }) + test.results.On("BatchStore", mock.Anything, mock.Anything, blockID, tf.ExpectedResults). + Return(func(lctx lockctx.Proof, batch storage.ReaderBatchWriter, blockID flow.Identifier, results []flow.LightTransactionResult) error { + require.True(t, lctx.HoldsLock(storage.LockInsertLightTransactionResult)) + require.NotNil(t, batch) + return nil + }) test.registers. On("Store", mock.Anything, tf.Block.Height). Run(func(args mock.Arguments) { @@ -245,7 +281,12 @@ func TestExecutionState_IndexBlockData(t *testing.T) { test.collections.On("StoreAndIndexByTransaction", mock.Anything, collection).Return(&flow.LightCollection{}, nil) } for txID, scheduledTxID := range tf.ExpectedScheduledTransactions { - test.scheduledTransactions.On("BatchIndex", mock.Anything, blockID, txID, scheduledTxID, mock.Anything).Return(nil) + test.scheduledTransactions.On("BatchIndex", mock.Anything, blockID, txID, scheduledTxID, mock.Anything). + Return(func(lctx lockctx.Proof, blockID flow.Identifier, txID flow.Identifier, scheduledTxID uint64, batch storage.ReaderBatchWriter) error { + require.True(t, lctx.HoldsLock(storage.LockIndexScheduledTransaction)) + require.NotNil(t, batch) + return nil + }) } err := test.indexer.IndexBlockData(tf.ExecutionDataEntity()) diff --git a/storage/events.go b/storage/events.go index 4062acea82e..853320709ae 100644 --- a/storage/events.go +++ b/storage/events.go @@ -23,9 +23,11 @@ type Events interface { EventsReader // Store will store events for the given block ID + // TODO: error documentation Store(blockID flow.Identifier, blockEvents []flow.EventsList) error // BatchStore will store events for the given block ID in a given batch + // TODO: error documentation BatchStore(blockID flow.Identifier, events []flow.EventsList, batch ReaderBatchWriter) error // BatchRemoveByBlockID removes events keyed by a blockID in provided batch diff --git a/storage/light_transaction_results.go b/storage/light_transaction_results.go index e2109d8e450..47ff7beac61 100644 --- a/storage/light_transaction_results.go +++ b/storage/light_transaction_results.go @@ -1,25 +1,28 @@ package storage -import "github.com/onflow/flow-go/model/flow" +import ( + "github.com/jordanschalm/lockctx" + + "github.com/onflow/flow-go/model/flow" +) // LightTransactionResultsReader represents persistent storage read operations for light transaction result type LightTransactionResultsReader interface { // ByBlockIDTransactionID returns the transaction result for the given block ID and transaction ID // - // Expected errors during normal operation: - // - `storage.ErrNotFound` if light transaction result at given blockID wasn't found. + // Expected error returns during normal operation: + // - [storage.ErrNotFound] if light transaction result at given blockID wasn't found. ByBlockIDTransactionID(blockID flow.Identifier, transactionID flow.Identifier) (*flow.LightTransactionResult, error) // ByBlockIDTransactionIndex returns the transaction result for the given blockID and transaction index // - // Expected errors during normal operation: - // - `storage.ErrNotFound` if light transaction result at given blockID and txIndex wasn't found. + // Expected error returns during normal operation: + // - [storage.ErrNotFound] if light transaction result at given blockID and txIndex wasn't found. ByBlockIDTransactionIndex(blockID flow.Identifier, txIndex uint32) (*flow.LightTransactionResult, error) // ByBlockID gets all transaction results for a block, ordered by transaction index - // - // Expected errors during normal operation: - // - `storage.ErrNotFound` if light transaction results at given blockID weren't found. + // CAUTION: this function returns the empty list in case for block IDs without known results. + // No error returns are expected during normal operations. ByBlockID(id flow.Identifier) ([]flow.LightTransactionResult, error) } @@ -27,9 +30,9 @@ type LightTransactionResultsReader interface { type LightTransactionResults interface { LightTransactionResultsReader - // BatchStore inserts a batch of transaction result into a batch - BatchStore(blockID flow.Identifier, transactionResults []flow.LightTransactionResult, rw ReaderBatchWriter) error - - // Deprecated: deprecated as a part of transition from Badger to Pebble. use BatchStore instead - BatchStoreBadger(blockID flow.Identifier, transactionResults []flow.LightTransactionResult, batch BatchStorage) error + // BatchStore persists and indexes all transaction results (light representation) for the given blockID + // as part of the provided batch. The caller must acquire [storage.LockInsertLightTransactionResult] and + // hold it until the write batch has been committed. + // It returns [storage.ErrAlreadyExists] if light transaction results for the block already exist. + BatchStore(lctx lockctx.Proof, rw ReaderBatchWriter, blockID flow.Identifier, transactionResults []flow.LightTransactionResult) error } diff --git a/storage/locks.go b/storage/locks.go index 20a84cd37f2..da3c2532fba 100644 --- a/storage/locks.go +++ b/storage/locks.go @@ -29,6 +29,10 @@ const ( LockBootstrapping = "lock_bootstrapping" // LockInsertChunkDataPack protects the insertion of chunk data packs (not yet used anywhere) LockInsertChunkDataPack = "lock_insert_chunk_data_pack" + // LockInsertTransactionResultErrMessage protects the insertion of transaction result error messages + LockInsertTransactionResultErrMessage = "lock_insert_transaction_result_message" + // LockInsertLightTransactionResult protects the insertion of light transaction results + LockInsertLightTransactionResult = "lock_insert_light_transaction_result" // LockInsertExecutionForkEvidence protects the insertion of execution fork evidence LockInsertExecutionForkEvidence = "lock_insert_execution_fork_evidence" LockInsertSafetyData = "lock_insert_safety_data" @@ -48,6 +52,8 @@ func Locks() []string { LockInsertCollection, LockBootstrapping, LockInsertChunkDataPack, + LockInsertTransactionResultErrMessage, + LockInsertLightTransactionResult, LockInsertExecutionForkEvidence, LockInsertSafetyData, LockInsertLivenessData, @@ -79,6 +85,11 @@ func makeLockPolicy() lockctx.Policy { Add(LockInsertSafetyData, LockInsertLivenessData). Add(LockInsertOrFinalizeClusterBlock, LockInsertSafetyData). Add(LockInsertOwnReceipt, LockInsertChunkDataPack). + + // module/executiondatasync/optimistic_sync/persisters/block.go#Persist + Add(LockInsertCollection, LockInsertLightTransactionResult). + Add(LockInsertLightTransactionResult, LockInsertTransactionResultErrMessage). + Add(LockInsertLightTransactionResult, LockIndexScheduledTransaction). Build() } diff --git a/storage/mock/light_transaction_results.go b/storage/mock/light_transaction_results.go index 6e6b277acb8..03f5f3326b3 100644 --- a/storage/mock/light_transaction_results.go +++ b/storage/mock/light_transaction_results.go @@ -3,7 +3,9 @@ package mock import ( + lockctx "github.com/jordanschalm/lockctx" flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" storage "github.com/onflow/flow-go/storage" @@ -14,35 +16,17 @@ type LightTransactionResults struct { mock.Mock } -// BatchStore provides a mock function with given fields: blockID, transactionResults, rw -func (_m *LightTransactionResults) BatchStore(blockID flow.Identifier, transactionResults []flow.LightTransactionResult, rw storage.ReaderBatchWriter) error { - ret := _m.Called(blockID, transactionResults, rw) +// BatchStore provides a mock function with given fields: lctx, rw, blockID, transactionResults +func (_m *LightTransactionResults) BatchStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, transactionResults []flow.LightTransactionResult) error { + ret := _m.Called(lctx, rw, blockID, transactionResults) if len(ret) == 0 { panic("no return value specified for BatchStore") } var r0 error - if rf, ok := ret.Get(0).(func(flow.Identifier, []flow.LightTransactionResult, storage.ReaderBatchWriter) error); ok { - r0 = rf(blockID, transactionResults, rw) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// BatchStoreBadger provides a mock function with given fields: blockID, transactionResults, batch -func (_m *LightTransactionResults) BatchStoreBadger(blockID flow.Identifier, transactionResults []flow.LightTransactionResult, batch storage.BatchStorage) error { - ret := _m.Called(blockID, transactionResults, batch) - - if len(ret) == 0 { - panic("no return value specified for BatchStoreBadger") - } - - var r0 error - if rf, ok := ret.Get(0).(func(flow.Identifier, []flow.LightTransactionResult, storage.BatchStorage) error); ok { - r0 = rf(blockID, transactionResults, batch) + if rf, ok := ret.Get(0).(func(lockctx.Proof, storage.ReaderBatchWriter, flow.Identifier, []flow.LightTransactionResult) error); ok { + r0 = rf(lctx, rw, blockID, transactionResults) } else { r0 = ret.Error(0) } diff --git a/storage/mock/transaction_result_error_messages.go b/storage/mock/transaction_result_error_messages.go index 3c1075fe3ac..16a940510ef 100644 --- a/storage/mock/transaction_result_error_messages.go +++ b/storage/mock/transaction_result_error_messages.go @@ -3,7 +3,9 @@ package mock import ( + lockctx "github.com/jordanschalm/lockctx" flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" storage "github.com/onflow/flow-go/storage" @@ -14,17 +16,17 @@ type TransactionResultErrorMessages struct { mock.Mock } -// BatchStore provides a mock function with given fields: blockID, transactionResultErrorMessages, batch -func (_m *TransactionResultErrorMessages) BatchStore(blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage, batch storage.ReaderBatchWriter) error { - ret := _m.Called(blockID, transactionResultErrorMessages, batch) +// BatchStore provides a mock function with given fields: lctx, rw, blockID, transactionResultErrorMessages +func (_m *TransactionResultErrorMessages) BatchStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error { + ret := _m.Called(lctx, rw, blockID, transactionResultErrorMessages) if len(ret) == 0 { panic("no return value specified for BatchStore") } var r0 error - if rf, ok := ret.Get(0).(func(flow.Identifier, []flow.TransactionResultErrorMessage, storage.ReaderBatchWriter) error); ok { - r0 = rf(blockID, transactionResultErrorMessages, batch) + if rf, ok := ret.Get(0).(func(lockctx.Proof, storage.ReaderBatchWriter, flow.Identifier, []flow.TransactionResultErrorMessage) error); ok { + r0 = rf(lctx, rw, blockID, transactionResultErrorMessages) } else { r0 = ret.Error(0) } @@ -150,17 +152,17 @@ func (_m *TransactionResultErrorMessages) Exists(blockID flow.Identifier) (bool, return r0, r1 } -// Store provides a mock function with given fields: blockID, transactionResultErrorMessages -func (_m *TransactionResultErrorMessages) Store(blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error { - ret := _m.Called(blockID, transactionResultErrorMessages) +// Store provides a mock function with given fields: lctx, blockID, transactionResultErrorMessages +func (_m *TransactionResultErrorMessages) Store(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error { + ret := _m.Called(lctx, blockID, transactionResultErrorMessages) if len(ret) == 0 { panic("no return value specified for Store") } var r0 error - if rf, ok := ret.Get(0).(func(flow.Identifier, []flow.TransactionResultErrorMessage) error); ok { - r0 = rf(blockID, transactionResultErrorMessages) + if rf, ok := ret.Get(0).(func(lockctx.Proof, flow.Identifier, []flow.TransactionResultErrorMessage) error); ok { + r0 = rf(lctx, blockID, transactionResultErrorMessages) } else { r0 = ret.Error(0) } diff --git a/storage/operation/transaction_results.go b/storage/operation/transaction_results.go index a97197a5cde..8f5ae7ea791 100644 --- a/storage/operation/transaction_results.go +++ b/storage/operation/transaction_results.go @@ -3,6 +3,8 @@ package operation import ( "fmt" + "github.com/jordanschalm/lockctx" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/storage" ) @@ -26,7 +28,6 @@ func RetrieveTransactionResultByIndex(r storage.Reader, blockID flow.Identifier, // LookupTransactionResultsByBlockIDUsingIndex retrieves all tx results for a block, by using // tx_index index. This correctly handles cases of duplicate transactions within block. func LookupTransactionResultsByBlockIDUsingIndex(r storage.Reader, blockID flow.Identifier, txResults *[]flow.TransactionResult) error { - txErrIterFunc := func(keyCopy []byte, getValue func(destVal any) error) (bail bool, err error) { var val flow.TransactionResult err = getValue(&val) @@ -40,23 +41,12 @@ func LookupTransactionResultsByBlockIDUsingIndex(r storage.Reader, blockID flow. return TraverseByPrefix(r, MakePrefix(codeTransactionResultIndex, blockID), txErrIterFunc, storage.DefaultIteratorOptions()) } -// RemoveTransactionResultsByBlockID removes the transaction results for the given blockID -func RemoveTransactionResultsByBlockID(r storage.Reader, w storage.Writer, blockID flow.Identifier) error { - prefix := MakePrefix(codeTransactionResult, blockID) - err := RemoveByKeyPrefix(r, w, prefix) - if err != nil { - return fmt.Errorf("could not remove transaction results for block %v: %w", blockID, err) - } - - return nil -} - -// BatchRemoveTransactionResultsByBlockID removes transaction results for the given blockID in a provided batch. +// RemoveTransactionResultsByBlockID removes transaction results for the given blockID in a provided batch. // No errors are expected during normal operation, but it may return generic error // if badger fails to process request -func BatchRemoveTransactionResultsByBlockID(blockID flow.Identifier, batch storage.ReaderBatchWriter) error { +func RemoveTransactionResultsByBlockID(blockID flow.Identifier, rw storage.ReaderBatchWriter) error { prefix := MakePrefix(codeTransactionResult, blockID) - err := RemoveByKeyPrefix(batch.GlobalReader(), batch.Writer(), prefix) + err := RemoveByKeyPrefix(rw.GlobalReader(), rw.Writer(), prefix) if err != nil { return fmt.Errorf("could not remove transaction results for block %v: %w", blockID, err) } @@ -64,31 +54,65 @@ func BatchRemoveTransactionResultsByBlockID(blockID flow.Identifier, batch stora return nil } -// deprecated -func InsertLightTransactionResult(w storage.Writer, blockID flow.Identifier, transactionResult *flow.LightTransactionResult) error { - return UpsertByKey(w, MakePrefix(codeLightTransactionResult, blockID, transactionResult.TransactionID), transactionResult) -} +// InsertAndIndexLightTransactionResults persists and indexes all transaction results (light representation) for the given blockID +// as part of the provided batch. The caller must acquire [storage.LockInsertLightTransactionResult] and hold it until the write +// batch has been committed. +// It returns [storage.ErrAlreadyExists] if light transaction results for the block already exist. +func InsertAndIndexLightTransactionResults( + lctx lockctx.Proof, rw storage.ReaderBatchWriter, + blockID flow.Identifier, + transactionResults []flow.LightTransactionResult, +) error { + if !lctx.HoldsLock(storage.LockInsertLightTransactionResult) { + return fmt.Errorf("lock %s is not held", storage.LockInsertLightTransactionResult) + } -func BatchInsertLightTransactionResult(w storage.Writer, blockID flow.Identifier, transactionResult *flow.LightTransactionResult) error { - return UpsertByKey(w, MakePrefix(codeLightTransactionResult, blockID, transactionResult.TransactionID), transactionResult) -} + // ensure we don't overwrite existing light transaction results for this block + prefix := MakePrefix(codeLightTransactionResult, blockID) + checkExists := func(key []byte) error { + return fmt.Errorf("light transaction results for block %s already exist: %w", blockID, storage.ErrAlreadyExists) + } + err := IterateKeysByPrefixRange(rw.GlobalReader(), prefix, prefix, checkExists) + if err != nil { + return err + } -func BatchIndexLightTransactionResult(w storage.Writer, blockID flow.Identifier, txIndex uint32, transactionResult *flow.LightTransactionResult) error { - return UpsertByKey(w, MakePrefix(codeLightTransactionResultIndex, blockID, txIndex), transactionResult) + w := rw.Writer() + for i, result := range transactionResults { + // inserts a light transaction result by block ID and transaction ID + err := UpsertByKey(w, MakePrefix(codeLightTransactionResult, blockID, result.TransactionID), &result) + if err != nil { + return fmt.Errorf("cannot batch insert light tx result: %w", err) + } + // indexes a light transaction result by index within the block + err = UpsertByKey(w, MakePrefix(codeLightTransactionResultIndex, blockID, uint32(i)), &result) + if err != nil { + return fmt.Errorf("cannot batch index light tx result: %w", err) + } + } + return nil } +// RetrieveLightTransactionResult retrieves the result (light representation) of the specified transaction +// within the specified block. +// Expected error returns during normal operations: +// - [storage.ErrNotFound] if no result of a transaction with the specified ID in `blockID` is known func RetrieveLightTransactionResult(r storage.Reader, blockID flow.Identifier, transactionID flow.Identifier, transactionResult *flow.LightTransactionResult) error { return RetrieveByKey(r, MakePrefix(codeLightTransactionResult, blockID, transactionID), transactionResult) } +// RetrieveLightTransactionResultByIndex retrieves the result (light representation) of the +// transaction at the given index within the specified block. +// Expected error returns during normal operations: +// - [storage.ErrNotFound] if no result of a transaction at `txIndex` in `blockID` is known func RetrieveLightTransactionResultByIndex(r storage.Reader, blockID flow.Identifier, txIndex uint32, transactionResult *flow.LightTransactionResult) error { return RetrieveByKey(r, MakePrefix(codeLightTransactionResultIndex, blockID, txIndex), transactionResult) } -// LookupLightTransactionResultsByBlockIDUsingIndex retrieves all tx results for a block, but using -// tx_index index. This correctly handles cases of duplicate transactions within block. +// LookupLightTransactionResultsByBlockIDUsingIndex retrieves all tx results for the specified block. +// CAUTION: this function returns the empty list in case for block IDs without known results. +// No error returns are expected during normal operations. func LookupLightTransactionResultsByBlockIDUsingIndex(r storage.Reader, blockID flow.Identifier, txResults *[]flow.LightTransactionResult) error { - txErrIterFunc := func(keyCopy []byte, getValue func(destVal any) error) (bail bool, err error) { var val flow.LightTransactionResult err = getValue(&val) @@ -102,29 +126,63 @@ func LookupLightTransactionResultsByBlockIDUsingIndex(r storage.Reader, blockID return TraverseByPrefix(r, MakePrefix(codeLightTransactionResultIndex, blockID), txErrIterFunc, storage.DefaultIteratorOptions()) } -// BatchInsertTransactionResultErrorMessage inserts a transaction result error message by block ID and transaction ID -// into the database using a batch write. -func BatchInsertTransactionResultErrorMessage(w storage.Writer, blockID flow.Identifier, transactionResultErrorMessage *flow.TransactionResultErrorMessage) error { - return UpsertByKey(w, MakePrefix(codeTransactionResultErrorMessage, blockID, transactionResultErrorMessage.TransactionID), transactionResultErrorMessage) -} +// InsertAndIndexTransactionResultErrorMessages persists and indexes all transaction result error messages for the given blockID +// as part of the provided batch. The caller must acquire [storage.LockInsertTransactionResultErrMessage] and hold it until the +// write batch has been committed. +// It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist. +func InsertAndIndexTransactionResultErrorMessages( + lctx lockctx.Proof, rw storage.ReaderBatchWriter, + blockID flow.Identifier, + transactionResultErrorMessages []flow.TransactionResultErrorMessage, +) error { + if !lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage) { + return fmt.Errorf("lock %s is not held", storage.LockInsertTransactionResultErrMessage) + } -// BatchIndexTransactionResultErrorMessage indexes a transaction result error message by index within the block using a -// batch write. -func BatchIndexTransactionResultErrorMessage(w storage.Writer, blockID flow.Identifier, transactionResultErrorMessage *flow.TransactionResultErrorMessage) error { - return UpsertByKey(w, MakePrefix(codeTransactionResultErrorMessageIndex, blockID, transactionResultErrorMessage.Index), transactionResultErrorMessage) + // ensure we don't overwrite existing tx result error messages for this block + prefix := MakePrefix(codeTransactionResultErrorMessage, blockID) + checkExists := func(key []byte) error { + return fmt.Errorf("transaction result error messages for block %s already exist: %w", blockID, storage.ErrAlreadyExists) + } + err := IterateKeysByPrefixRange(rw.GlobalReader(), prefix, prefix, checkExists) + if err != nil { + return err + } + + w := rw.Writer() + for _, txErrMsg := range transactionResultErrorMessages { + // insertTransactionResultErrorMessageByTxID inserts a transaction result error message by block ID and transaction ID + err := UpsertByKey(w, MakePrefix(codeTransactionResultErrorMessage, blockID, txErrMsg.TransactionID), &txErrMsg) + if err != nil { + return fmt.Errorf("cannot batch insert tx result error message: %w", err) + } + // indexTransactionResultErrorMessageBlockIDTxIndex indexes a transaction result error message by index within the block + err = UpsertByKey(w, MakePrefix(codeTransactionResultErrorMessageIndex, blockID, txErrMsg.Index), &txErrMsg) + if err != nil { + return fmt.Errorf("cannot batch index tx result error message: %w", err) + } + } + return nil } -// RetrieveTransactionResultErrorMessage retrieves a transaction result error message by block ID and transaction ID. +// RetrieveTransactionResultErrorMessage retrieves a transaction result error message of the specified transaction +// within the specified block. +// Expected error returns during normal operations: +// - [storage.ErrNotFound] if no result error message of a transaction with the specified ID in `blockID` is known func RetrieveTransactionResultErrorMessage(r storage.Reader, blockID flow.Identifier, transactionID flow.Identifier, transactionResultErrorMessage *flow.TransactionResultErrorMessage) error { return RetrieveByKey(r, MakePrefix(codeTransactionResultErrorMessage, blockID, transactionID), transactionResultErrorMessage) } -// RetrieveTransactionResultErrorMessageByIndex retrieves a transaction result error message by block ID and index. +// RetrieveTransactionResultErrorMessageByIndex retrieves the transaction result error message of the +// transaction at the given index within the specified block. +// Expected error returns during normal operations: +// - [storage.ErrNotFound] if no result of a transaction at `txIndex` in `blockID` is known func RetrieveTransactionResultErrorMessageByIndex(r storage.Reader, blockID flow.Identifier, txIndex uint32, transactionResultErrorMessage *flow.TransactionResultErrorMessage) error { return RetrieveByKey(r, MakePrefix(codeTransactionResultErrorMessageIndex, blockID, txIndex), transactionResultErrorMessage) } // TransactionResultErrorMessagesExists checks whether tx result error messages exist in the database. +// No error returns are expected during normal operations. func TransactionResultErrorMessagesExists(r storage.Reader, blockID flow.Identifier, blockExists *bool) error { exists, err := KeyExists(r, MakePrefix(codeTransactionResultErrorMessageIndex, blockID)) if err != nil { @@ -134,8 +192,10 @@ func TransactionResultErrorMessagesExists(r storage.Reader, blockID flow.Identif return nil } -// LookupTransactionResultErrorMessagesByBlockIDUsingIndex retrieves all tx result error messages for a block, by using -// tx_index index. This correctly handles cases of duplicate transactions within block. +// LookupTransactionResultErrorMessagesByBlockIDUsingIndex retrieves the transaction result error messages of all +// failed transactions for the specified block. +// CAUTION: This method returns an empty slice if transaction results/errors for the block are not indexed yet OR if the block does not have any errors. +// No error returns are expected during normal operations. func LookupTransactionResultErrorMessagesByBlockIDUsingIndex(r storage.Reader, blockID flow.Identifier, txResultErrorMessages *[]flow.TransactionResultErrorMessage) error { txErrIterFunc := func(keyCopy []byte, getValue func(destVal any) error) (bail bool, err error) { var val flow.TransactionResultErrorMessage diff --git a/storage/operation/transaction_results_test.go b/storage/operation/transaction_results_test.go new file mode 100644 index 00000000000..bdd075570a4 --- /dev/null +++ b/storage/operation/transaction_results_test.go @@ -0,0 +1,30 @@ +package operation_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestRetrieveAllTxResultsForBlock verifies the working of persisting, indexing and retrieving +// [flow.LightTransactionResult] by block, transaction ID, and transaction index. +func TestRetrieveAllTxResultsForBlock(t *testing.T) { + t.Run("looking up transaction results for unknown block yields empty list", func(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + unknownBlockID := unittest.IdentifierFixture() + transactionResults := make([]flow.LightTransactionResult, 0) + + err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.LookupLightTransactionResultsByBlockIDUsingIndex(rw.GlobalReader(), unknownBlockID, &transactionResults) + }) + require.NoError(t, err) + require.Empty(t, transactionResults) + }) + }) +} diff --git a/storage/store/light_transaction_results.go b/storage/store/light_transaction_results.go index dd8cab8e29a..6292ebbfe30 100644 --- a/storage/store/light_transaction_results.go +++ b/storage/store/light_transaction_results.go @@ -1,7 +1,7 @@ package store import ( - "fmt" + "github.com/jordanschalm/lockctx" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" @@ -71,19 +71,15 @@ func NewLightTransactionResults(collector module.CacheMetrics, db storage.DB, tr } } -func (tr *LightTransactionResults) BatchStore(blockID flow.Identifier, transactionResults []flow.LightTransactionResult, rw storage.ReaderBatchWriter) error { - w := rw.Writer() - - for i, result := range transactionResults { - err := operation.BatchInsertLightTransactionResult(w, blockID, &result) - if err != nil { - return fmt.Errorf("cannot batch insert tx result: %w", err) - } - - err = operation.BatchIndexLightTransactionResult(w, blockID, uint32(i), &result) - if err != nil { - return fmt.Errorf("cannot batch index tx result: %w", err) - } +// BatchStore persists and indexes all transaction results (light representation) for the given blockID +// as part of the provided batch. The caller must acquire [storage.LockInsertLightTransactionResult] and +// hold it until the write batch has been committed. +// It returns [storage.ErrAlreadyExists] if light transaction results for the block already exist. +func (tr *LightTransactionResults) BatchStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, transactionResults []flow.LightTransactionResult) error { + // requires [storage.LockInsertLightTransactionResult] + err := operation.InsertAndIndexLightTransactionResults(lctx, rw, blockID, transactionResults) + if err != nil { + return err } storage.OnCommitSucceed(rw, func() { @@ -103,11 +99,10 @@ func (tr *LightTransactionResults) BatchStore(blockID flow.Identifier, transacti return nil } -func (tr *LightTransactionResults) BatchStoreBadger(blockID flow.Identifier, transactionResults []flow.LightTransactionResult, batch storage.BatchStorage) error { - panic("LightTransactionResults BatchStoreBadger not implemented") -} - // ByBlockIDTransactionID returns the transaction result for the given block ID and transaction ID +// +// Expected error returns during normal operation: +// - [storage.ErrNotFound] if light transaction result at given blockID wasn't found. func (tr *LightTransactionResults) ByBlockIDTransactionID(blockID flow.Identifier, txID flow.Identifier) (*flow.LightTransactionResult, error) { key := KeyFromBlockIDTransactionID(blockID, txID) transactionResult, err := tr.cache.Get(tr.db.Reader(), key) @@ -118,6 +113,9 @@ func (tr *LightTransactionResults) ByBlockIDTransactionID(blockID flow.Identifie } // ByBlockIDTransactionIndex returns the transaction result for the given blockID and transaction index +// +// Expected error returns during normal operation: +// - [storage.ErrNotFound] if light transaction result at given blockID and txIndex wasn't found. func (tr *LightTransactionResults) ByBlockIDTransactionIndex(blockID flow.Identifier, txIndex uint32) (*flow.LightTransactionResult, error) { key := KeyFromBlockIDIndex(blockID, txIndex) transactionResult, err := tr.indexCache.Get(tr.db.Reader(), key) @@ -128,6 +126,8 @@ func (tr *LightTransactionResults) ByBlockIDTransactionIndex(blockID flow.Identi } // ByBlockID gets all transaction results for a block, ordered by transaction index +// CAUTION: this function returns the empty list in case for block IDs without known results. +// No error returns are expected during normal operations. func (tr *LightTransactionResults) ByBlockID(blockID flow.Identifier) ([]flow.LightTransactionResult, error) { transactionResults, err := tr.blockCache.Get(tr.db.Reader(), blockID) if err != nil { diff --git a/storage/store/light_transaction_results_test.go b/storage/store/light_transaction_results_test.go index c3ea965ab72..c773959b8b5 100644 --- a/storage/store/light_transaction_results_test.go +++ b/storage/store/light_transaction_results_test.go @@ -3,6 +3,7 @@ package store_test import ( "testing" + "github.com/jordanschalm/lockctx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" @@ -17,27 +18,32 @@ import ( func TestBatchStoringLightTransactionResults(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - store1 := store.NewLightTransactionResults(metrics, db, 1000) + txResultsStore := store.NewLightTransactionResults(metrics, db, 1000) blockID := unittest.IdentifierFixture() txResults := getLightTransactionResultsFixture(10) - t.Run("batch store1 results", func(t *testing.T) { - require.NoError(t, db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { - return store1.BatchStore(blockID, txResults, rw) + t.Run("batch txResultsStore results", func(t *testing.T) { + require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertLightTransactionResult, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return txResultsStore.BatchStore(lctx, rw, blockID, txResults) + }) })) // add a results to a new block to validate they are not included in lookups - require.NoError(t, db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { - return store1.BatchStore(unittest.IdentifierFixture(), getLightTransactionResultsFixture(2), rw) + require.NoError(t, unittest.WithLock(t, lockManager, storage.LockInsertLightTransactionResult, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return txResultsStore.BatchStore(lctx, rw, unittest.IdentifierFixture(), getLightTransactionResultsFixture(2)) + }) })) }) t.Run("read results with cache", func(t *testing.T) { for _, txResult := range txResults { - actual, err := store1.ByBlockIDTransactionID(blockID, txResult.TransactionID) + actual, err := txResultsStore.ByBlockIDTransactionID(blockID, txResult.TransactionID) require.NoError(t, err) assert.Equal(t, txResult, *actual) } @@ -57,7 +63,7 @@ func TestBatchStoringLightTransactionResults(t *testing.T) { t.Run("cached and non-cached results are equal", func(t *testing.T) { // check retrieving by index from both cache and db for i := len(txResults) - 1; i >= 0; i-- { - actual, err := store1.ByBlockIDTransactionIndex(blockID, uint32(i)) + actual, err := txResultsStore.ByBlockIDTransactionIndex(blockID, uint32(i)) require.NoError(t, err) assert.Equal(t, txResults[i], *actual) @@ -68,7 +74,7 @@ func TestBatchStoringLightTransactionResults(t *testing.T) { }) t.Run("read all results for block", func(t *testing.T) { - actuals, err := store1.ByBlockID(blockID) + actuals, err := txResultsStore.ByBlockID(blockID) require.NoError(t, err) assert.Equal(t, len(txResults), len(actuals)) @@ -82,20 +88,108 @@ func TestBatchStoringLightTransactionResults(t *testing.T) { func TestReadingNotStoredLightTransactionResults(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { metrics := metrics.NewNoopCollector() - store1 := store.NewLightTransactionResults(metrics, db, 1000) + txResultsStore := store.NewLightTransactionResults(metrics, db, 1000) blockID := unittest.IdentifierFixture() txID := unittest.IdentifierFixture() txIndex := rand.Uint32() - _, err := store1.ByBlockIDTransactionID(blockID, txID) + _, err := txResultsStore.ByBlockIDTransactionID(blockID, txID) assert.ErrorIs(t, err, storage.ErrNotFound) - _, err = store1.ByBlockIDTransactionIndex(blockID, txIndex) + _, err = txResultsStore.ByBlockIDTransactionIndex(blockID, txIndex) assert.ErrorIs(t, err, storage.ErrNotFound) }) } +// Test that attempting to batch store light transaction results for a block ID that already exists +// results in a [storage.ErrAlreadyExists] error, and that the original data remains unchanged. +func TestBatchStoreLightTransactionResultsErrAlreadyExists(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + metrics := metrics.NewNoopCollector() + txResultsStore := store.NewLightTransactionResults(metrics, db, 1000) + + blockID := unittest.IdentifierFixture() + txResults := getLightTransactionResultsFixture(3) + + // First batch store should succeed + err := unittest.WithLock(t, lockManager, storage.LockInsertLightTransactionResult, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return txResultsStore.BatchStore(lctx, rw, blockID, txResults) + }) + }) + require.NoError(t, err) + + // Second batch store with the same blockID should fail with ErrAlreadyExists + duplicateTxResults := getLightTransactionResultsFixture(2) + err = unittest.WithLock(t, lockManager, storage.LockInsertLightTransactionResult, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return txResultsStore.BatchStore(lctx, rw, blockID, duplicateTxResults) + }) + }) + require.Error(t, err) + require.ErrorIs(t, err, storage.ErrAlreadyExists) + + // Verify that the original data is unchanged + actuals, err := txResultsStore.ByBlockID(blockID) + require.NoError(t, err) + require.Equal(t, len(txResults), len(actuals)) + for i := range txResults { + assert.Equal(t, txResults[i], actuals[i]) + } + }) +} + +// Test that attempting to batch store light transaction results without holding the required lock +// results in an error indicating the missing lock. The implementation should not conflate this error +// case with data for the same key already existing, ie. it should not return [storage.ErrAlreadyExists]. +func TestBatchStoreLightTransactionResultsMissingLock(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + metrics := metrics.NewNoopCollector() + txResultsStore := store.NewLightTransactionResults(metrics, db, 1000) + + blockID := unittest.IdentifierFixture() + txResults := getLightTransactionResultsFixture(3) + + // Create a context without the required lock + lockManager := storage.NewTestingLockManager() + lctx := lockManager.NewContext() + defer lctx.Release() + + err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return txResultsStore.BatchStore(lctx, rw, blockID, txResults) + }) + require.Error(t, err) + require.NotErrorIs(t, err, storage.ErrAlreadyExists) + require.Contains(t, err.Error(), "lock_insert_light_transaction_result") + }) +} + +// Test that attempting to batch store light transaction results while holding the wrong lock +// results in an error indicating the incorrect lock. The implementation should not conflate this error +// case with data for the same key already existing, ie. it should not return [storage.ErrAlreadyExists]. +func TestBatchStoreLightTransactionResultsWrongLock(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + metrics := metrics.NewNoopCollector() + txResultsStore := store.NewLightTransactionResults(metrics, db, 1000) + + blockID := unittest.IdentifierFixture() + txResults := getLightTransactionResultsFixture(3) + + // Try to use the wrong lock + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return txResultsStore.BatchStore(lctx, rw, blockID, txResults) + }) + }) + require.Error(t, err) + require.NotErrorIs(t, err, storage.ErrAlreadyExists) + require.Contains(t, err.Error(), "lock_insert_light_transaction_result") + }) +} + func getLightTransactionResultsFixture(n int) []flow.LightTransactionResult { txResults := make([]flow.LightTransactionResult, 0, n) for i := 0; i < n; i++ { diff --git a/storage/store/transaction_result_error_messages.go b/storage/store/transaction_result_error_messages.go index 6f315216aea..e8238a6cc44 100644 --- a/storage/store/transaction_result_error_messages.go +++ b/storage/store/transaction_result_error_messages.go @@ -3,6 +3,8 @@ package store import ( "fmt" + "github.com/jordanschalm/lockctx" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" @@ -71,17 +73,16 @@ func NewTransactionResultErrorMessages(collector module.CacheMetrics, db storage } } -// Store will store transaction result error messages for the given block ID. -// -// No errors are expected during normal operation. -func (t *TransactionResultErrorMessages) Store(blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error { +// Store persists and indexes all transaction result error messages for the given blockID. The caller must +// acquire [storage.LockInsertTransactionResultErrMessage] and hold it until the write batch has been committed. +// It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist. +func (t *TransactionResultErrorMessages) Store(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error { return t.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { - return t.BatchStore(blockID, transactionResultErrorMessages, rw) + return t.BatchStore(lctx, rw, blockID, transactionResultErrorMessages) }) } // Exists returns true if transaction result error messages for the given ID have been stored. -// // No errors are expected during normal operation. func (t *TransactionResultErrorMessages) Exists(blockID flow.Identifier) (bool, error) { // if the block is in the cache, return true @@ -98,28 +99,23 @@ func (t *TransactionResultErrorMessages) Exists(blockID flow.Identifier) (bool, return exists, nil } -// BatchStore inserts a batch of transaction result error messages into a batch -// -// No errors are expected during normal operation. +// BatchStore persists and indexes all transaction result error messages for the given blockID as part +// of the provided batch. The caller must acquire [storage.LockInsertTransactionResultErrMessage] and +// hold it until the write batch has been committed. +// It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist. func (t *TransactionResultErrorMessages) BatchStore( + lctx lockctx.Proof, + rw storage.ReaderBatchWriter, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage, - batch storage.ReaderBatchWriter, ) error { - writer := batch.Writer() - for _, result := range transactionResultErrorMessages { - err := operation.BatchInsertTransactionResultErrorMessage(writer, blockID, &result) - if err != nil { - return fmt.Errorf("cannot batch insert tx result error message: %w", err) - } - - err = operation.BatchIndexTransactionResultErrorMessage(writer, blockID, &result) - if err != nil { - return fmt.Errorf("cannot batch index tx result error message: %w", err) - } + // requires [storage.LockInsertTransactionResultErrMessage] + err := operation.InsertAndIndexTransactionResultErrorMessages(lctx, rw, blockID, transactionResultErrorMessages) + if err != nil { + return err } - storage.OnCommitSucceed(batch, func() { + storage.OnCommitSucceed(rw, func() { for _, result := range transactionResultErrorMessages { key := KeyFromBlockIDTransactionID(blockID, result.TransactionID) // cache for each transaction, so that it's faster to retrieve diff --git a/storage/store/transaction_result_error_messages_test.go b/storage/store/transaction_result_error_messages_test.go index 02238f0138c..5a779ec6ab5 100644 --- a/storage/store/transaction_result_error_messages_test.go +++ b/storage/store/transaction_result_error_messages_test.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + "github.com/jordanschalm/lockctx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" @@ -19,18 +20,19 @@ import ( func TestStoringTransactionResultErrorMessages(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() metrics := metrics.NewNoopCollector() - store1 := store.NewTransactionResultErrorMessages(metrics, db, 1000) + txErrMsgStore := store.NewTransactionResultErrorMessages(metrics, db, 1000) blockID := unittest.IdentifierFixture() // test db Exists by block id - exists, err := store1.Exists(blockID) + exists, err := txErrMsgStore.Exists(blockID) require.NoError(t, err) require.False(t, exists) // check retrieving by ByBlockID - messages, err := store1.ByBlockID(blockID) + messages, err := txErrMsgStore.ByBlockID(blockID) require.NoError(t, err) require.Nil(t, messages) @@ -44,30 +46,32 @@ func TestStoringTransactionResultErrorMessages(t *testing.T) { } txErrorMessages = append(txErrorMessages, expected) } - err = store1.Store(blockID, txErrorMessages) + err = unittest.WithLock(t, lockManager, storage.LockInsertTransactionResultErrMessage, func(lctx lockctx.Context) error { + return txErrMsgStore.Store(lctx, blockID, txErrorMessages) + }) require.NoError(t, err) // test db Exists by block id - exists, err = store1.Exists(blockID) + exists, err = txErrMsgStore.Exists(blockID) require.NoError(t, err) require.True(t, exists) // check retrieving by ByBlockIDTransactionID for _, txErrorMessage := range txErrorMessages { - actual, err := store1.ByBlockIDTransactionID(blockID, txErrorMessage.TransactionID) + actual, err := txErrMsgStore.ByBlockIDTransactionID(blockID, txErrorMessage.TransactionID) require.NoError(t, err) assert.Equal(t, txErrorMessage, *actual) } // check retrieving by ByBlockIDTransactionIndex for _, txErrorMessage := range txErrorMessages { - actual, err := store1.ByBlockIDTransactionIndex(blockID, txErrorMessage.Index) + actual, err := txErrMsgStore.ByBlockIDTransactionIndex(blockID, txErrorMessage.Index) require.NoError(t, err) assert.Equal(t, txErrorMessage, *actual) } // check retrieving by ByBlockID - actual, err := store1.ByBlockID(blockID) + actual, err := txErrMsgStore.ByBlockID(blockID) require.NoError(t, err) assert.Equal(t, txErrorMessages, actual) @@ -81,7 +85,7 @@ func TestStoringTransactionResultErrorMessages(t *testing.T) { // check retrieving by index from both cache and db for i, txErrorMessage := range txErrorMessages { - actual, err := store1.ByBlockIDTransactionIndex(blockID, txErrorMessage.Index) + actual, err := txErrMsgStore.ByBlockIDTransactionIndex(blockID, txErrorMessage.Index) require.NoError(t, err) assert.Equal(t, txErrorMessages[i], *actual) @@ -95,16 +99,147 @@ func TestStoringTransactionResultErrorMessages(t *testing.T) { func TestReadingNotStoreTransactionResultErrorMessage(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { metrics := metrics.NewNoopCollector() - store1 := store.NewTransactionResultErrorMessages(metrics, db, 1000) + txErrMsgStore := store.NewTransactionResultErrorMessages(metrics, db, 1000) blockID := unittest.IdentifierFixture() txID := unittest.IdentifierFixture() txIndex := rand.Uint32() - _, err := store1.ByBlockIDTransactionID(blockID, txID) + _, err := txErrMsgStore.ByBlockIDTransactionID(blockID, txID) assert.ErrorIs(t, err, storage.ErrNotFound) - _, err = store1.ByBlockIDTransactionIndex(blockID, txIndex) + _, err = txErrMsgStore.ByBlockIDTransactionIndex(blockID, txIndex) assert.ErrorIs(t, err, storage.ErrNotFound) }) } + +// Test that attempting to batch store transaction result error messages for a block ID that already exists +// results in a [storage.ErrAlreadyExists] error, and that the original messages remain unchanged. +func TestBatchStoreTransactionResultErrorMessagesErrAlreadyExists(t *testing.T) { + lockManager := storage.NewTestingLockManager() + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + metrics := metrics.NewNoopCollector() + st := store.NewTransactionResultErrorMessages(metrics, db, 1000) + + blockID := unittest.IdentifierFixture() + txResultErrMsgs := make([]flow.TransactionResultErrorMessage, 0) + for i := 0; i < 3; i++ { + expected := flow.TransactionResultErrorMessage{ + TransactionID: unittest.IdentifierFixture(), + ErrorMessage: fmt.Sprintf("a runtime error %d", i), + Index: uint32(i), + ExecutorID: unittest.IdentifierFixture(), + } + txResultErrMsgs = append(txResultErrMsgs, expected) + } + + // First batch store should succeed + err := unittest.WithLock(t, lockManager, storage.LockInsertTransactionResultErrMessage, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return st.BatchStore(lctx, rw, blockID, txResultErrMsgs) + }) + }) + require.NoError(t, err) + + // Second batch store with the same blockID should fail with ErrAlreadyExists + duplicateTxResultErrMsgs := make([]flow.TransactionResultErrorMessage, 0) + for i := 0; i < 2; i++ { + expected := flow.TransactionResultErrorMessage{ + TransactionID: unittest.IdentifierFixture(), + ErrorMessage: fmt.Sprintf("duplicate error %d", i), + Index: uint32(i), + ExecutorID: unittest.IdentifierFixture(), + } + duplicateTxResultErrMsgs = append(duplicateTxResultErrMsgs, expected) + } + + err = unittest.WithLock(t, lockManager, storage.LockInsertTransactionResultErrMessage, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return st.BatchStore(lctx, rw, blockID, duplicateTxResultErrMsgs) + }) + }) + require.Error(t, err) + require.ErrorIs(t, err, storage.ErrAlreadyExists) + + // Verify that the original transaction result error messages are still there and unchanged + for _, txResultErrMsg := range txResultErrMsgs { + actual, err := st.ByBlockIDTransactionID(blockID, txResultErrMsg.TransactionID) + require.NoError(t, err) + assert.Equal(t, txResultErrMsg, *actual) + } + + // Verify that the duplicate transaction result error messages were not stored + for _, txResultErrMsg := range duplicateTxResultErrMsgs { + _, err := st.ByBlockIDTransactionID(blockID, txResultErrMsg.TransactionID) + require.Error(t, err) + require.ErrorIs(t, err, storage.ErrNotFound) + } + }) +} + +// Test that attempting to batch store transaction result error messages without holding the required lock +// results in an error indicating the missing lock. The implementation should not conflate this error +// case with data for the same key already existing, ie. it should not return [storage.ErrAlreadyExists]. +func TestBatchStoreTransactionResultErrorMessagesMissingLock(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + metrics := metrics.NewNoopCollector() + st := store.NewTransactionResultErrorMessages(metrics, db, 1000) + + blockID := unittest.IdentifierFixture() + txResultErrMsgs := make([]flow.TransactionResultErrorMessage, 0) + for i := 0; i < 3; i++ { + expected := flow.TransactionResultErrorMessage{ + TransactionID: unittest.IdentifierFixture(), + ErrorMessage: fmt.Sprintf("a runtime error %d", i), + Index: uint32(i), + ExecutorID: unittest.IdentifierFixture(), + } + txResultErrMsgs = append(txResultErrMsgs, expected) + } + + // Create a context without the required lock + lockManager := storage.NewTestingLockManager() + lctx := lockManager.NewContext() + defer lctx.Release() + + err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return st.BatchStore(lctx, rw, blockID, txResultErrMsgs) + }) + require.Error(t, err) + require.NotErrorIs(t, err, storage.ErrAlreadyExists) + require.Contains(t, err.Error(), "lock_insert_transaction_result_message") + }) +} + +// Test that attempting to batch store transaction result error messages while holding the wrong lock +// results in an error indicating the incorrect lock. The implementation should not conflate this error +// case with data for the same key already existing, ie. it should not return [storage.ErrAlreadyExists]. +func TestBatchStoreTransactionResultErrorMessagesWrongLock(t *testing.T) { + lockManager := storage.NewTestingLockManager() + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + metrics := metrics.NewNoopCollector() + st := store.NewTransactionResultErrorMessages(metrics, db, 1000) + + blockID := unittest.IdentifierFixture() + txResultErrMsgs := make([]flow.TransactionResultErrorMessage, 0) + for i := 0; i < 3; i++ { + expected := flow.TransactionResultErrorMessage{ + TransactionID: unittest.IdentifierFixture(), + ErrorMessage: fmt.Sprintf("a runtime error %d", i), + Index: uint32(i), + ExecutorID: unittest.IdentifierFixture(), + } + txResultErrMsgs = append(txResultErrMsgs, expected) + } + + // Try to use the wrong lock + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return st.BatchStore(lctx, rw, blockID, txResultErrMsgs) + }) + }) + require.Error(t, err) + require.NotErrorIs(t, err, storage.ErrAlreadyExists) + require.Contains(t, err.Error(), "lock_insert_transaction_result_message") + }) +} diff --git a/storage/store/transaction_results.go b/storage/store/transaction_results.go index a870fe65ba8..6b7bea3e019 100644 --- a/storage/store/transaction_results.go +++ b/storage/store/transaction_results.go @@ -233,7 +233,7 @@ func (tr *TransactionResults) BatchRemoveByBlockID(blockID flow.Identifier, batc saveBlockIDInBatchData(batch, batchDataKey, blockID) - return operation.BatchRemoveTransactionResultsByBlockID(blockID, batch) + return operation.RemoveTransactionResultsByBlockID(blockID, batch) } func saveBlockIDInBatchData(batch storage.ReaderBatchWriter, batchDataKey string, blockID flow.Identifier) { diff --git a/storage/transaction_result_error_messages.go b/storage/transaction_result_error_messages.go index a573bbae8a2..73e71507d1e 100644 --- a/storage/transaction_result_error_messages.go +++ b/storage/transaction_result_error_messages.go @@ -1,28 +1,48 @@ package storage -import "github.com/onflow/flow-go/model/flow" +import ( + "github.com/jordanschalm/lockctx" + + "github.com/onflow/flow-go/model/flow" +) // TransactionResultErrorMessagesReader represents persistent storage read operations for transaction result error messages type TransactionResultErrorMessagesReader interface { // Exists returns true if transaction result error messages for the given ID have been stored. // + // Note that transaction error messages are auxiliary data provided by the Execution Nodes on a goodwill basis and + // not protected by the protocol. Execution Error messages might be non-deterministic, i.e. potentially different + // for different execution nodes. + // // No errors are expected during normal operation. Exists(blockID flow.Identifier) (bool, error) // ByBlockIDTransactionID returns the transaction result error message for the given block ID and transaction ID. // + // Note that transaction error messages are auxiliary data provided by the Execution Nodes on a goodwill basis and + // not protected by the protocol. Execution Error messages might be non-deterministic, i.e. potentially different + // for different execution nodes. + // // Expected errors during normal operation: - // - `storage.ErrNotFound` if no transaction error message is known at given block and transaction id. + // - [storage.ErrNotFound] if no transaction error message is known at given block and transaction id. ByBlockIDTransactionID(blockID flow.Identifier, transactionID flow.Identifier) (*flow.TransactionResultErrorMessage, error) // ByBlockIDTransactionIndex returns the transaction result error message for the given blockID and transaction index. // + // Note that transaction error messages are auxiliary data provided by the Execution Nodes on a goodwill basis and + // not protected by the protocol. Execution Error messages might be non-deterministic, i.e. potentially different + // for different execution nodes. + // // Expected errors during normal operation: - // - `storage.ErrNotFound` if no transaction error message is known at given block and transaction index. + // - [storage.ErrNotFound] if no transaction error message is known at given block and transaction index. ByBlockIDTransactionIndex(blockID flow.Identifier, txIndex uint32) (*flow.TransactionResultErrorMessage, error) // ByBlockID gets all transaction result error messages for a block, ordered by transaction index. - // Note: This method will return an empty slice both if the block is not indexed yet and if the block does not have any errors. + // CAUTION: This method will return an empty slice both if the block is not indexed yet and if the block does not have any errors. + // + // Note that transaction error messages are auxiliary data provided by the Execution Nodes on a goodwill basis and + // not protected by the protocol. Execution Error messages might be non-deterministic, i.e. potentially different + // for different execution nodes. // // No errors are expected during normal operations. ByBlockID(id flow.Identifier) ([]flow.TransactionResultErrorMessage, error) @@ -32,13 +52,14 @@ type TransactionResultErrorMessagesReader interface { type TransactionResultErrorMessages interface { TransactionResultErrorMessagesReader - // Store will store transaction result error messages for the given block ID. - // - // No errors are expected during normal operation. - Store(blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error + // Store persists and indexes all transaction result error messages for the given blockID. The caller must + // acquire [storage.LockInsertTransactionResultErrMessage] and hold it until the write batch has been committed. + // It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist. + Store(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error - // BatchStore inserts a batch of transaction result error messages into a batch - // - // No errors are expected during normal operation. - BatchStore(blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage, batch ReaderBatchWriter) error + // BatchStore persists and indexes all transaction result error messages for the given blockID as part + // of the provided batch. The caller must acquire [storage.LockInsertTransactionResultErrMessage] and + // hold it until the write batch has been committed. + // It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist. + BatchStore(lctx lockctx.Proof, rw ReaderBatchWriter, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error }