Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
af1c3e6
refactor index tx error message
zhangchiqing Oct 8, 2025
d86569f
refactor index tx error message
zhangchiqing Oct 8, 2025
af54bfe
fix store tests
zhangchiqing Oct 8, 2025
b413203
refactor tx error message
zhangchiqing Oct 8, 2025
a1a371d
add test case
zhangchiqing Oct 8, 2025
157d4ec
fix tests
zhangchiqing Oct 8, 2025
43ef330
fix tests
zhangchiqing Oct 8, 2025
155220d
fix tests
zhangchiqing Oct 8, 2025
e666a2a
fix lint
zhangchiqing Oct 8, 2025
5277616
fix admin tests
zhangchiqing Oct 8, 2025
28b4f19
handle already exists error
zhangchiqing Oct 8, 2025
bbd6ed9
fix tests
zhangchiqing Oct 8, 2025
af6427a
handle already exists error
zhangchiqing Oct 8, 2025
4eb3b45
fix optimisic sync core store operation
zhangchiqing Oct 8, 2025
45524d0
fix optimistic sync persister block
zhangchiqing Oct 8, 2025
1ee701b
fix tests
zhangchiqing Oct 8, 2025
79751ac
Merge branch 'master' into leo/refactor-index-tx-err-msg
zhangchiqing Oct 8, 2025
e2039ac
fix tests
zhangchiqing Oct 8, 2025
c9563fd
fix optimistic sync core
zhangchiqing Oct 8, 2025
6dddf0a
fix mocks
zhangchiqing Oct 8, 2025
346cdae
fix mocks
zhangchiqing Oct 8, 2025
f07a652
fix tests
zhangchiqing Oct 8, 2025
e8036e7
Apply suggestions from code review
zhangchiqing Oct 10, 2025
190c46d
rename operation functions
zhangchiqing Oct 10, 2025
9bcdb06
fix Persist transaction error message
zhangchiqing Oct 10, 2025
147f3cc
Fix lint
zhangchiqing Oct 10, 2025
9c36182
rename function names
zhangchiqing Oct 10, 2025
3fc2a9a
rename to txErrMsgStore
zhangchiqing Oct 10, 2025
eefd772
Merge branch 'master' into leo/refactor-index-tx-err-msg
zhangchiqing Oct 10, 2025
dd6cfa2
update lock policy
zhangchiqing Oct 10, 2025
780e20a
extended documentation, especially error returns
AlexHentschel Oct 10, 2025
4e11ecf
extended documentation, especially error returns
AlexHentschel Oct 11, 2025
cafb925
added test
AlexHentschel Oct 11, 2025
ac316dd
more error documentation
AlexHentschel Oct 11, 2025
64dfc09
polishing doc
AlexHentschel Oct 11, 2025
4ac5c28
added minor documentation and additional error check to test `storage…
AlexHentschel Oct 11, 2025
ac296c1
polishing goDoc
AlexHentschel Oct 11, 2025
9d0cbdb
added deprecation notice
AlexHentschel Oct 11, 2025
51a1895
polished goDoc
AlexHentschel Oct 11, 2025
c20915b
polished goDoc
AlexHentschel Oct 11, 2025
9a21aea
added cautionary statement regarding missing error doc
AlexHentschel Oct 11, 2025
127db87
extending documentation
AlexHentschel Oct 11, 2025
015edcc
moved Testing Lock manager's initialization very close to the init of…
AlexHentschel Oct 11, 2025
73c6514
marginal code consolidation
AlexHentschel Oct 11, 2025
9d9a747
remove review comments
zhangchiqing Oct 14, 2025
6f77ad3
Merge remote-tracking branch 'origin/leo/refactor-index-tx-err-msg' i…
zhangchiqing Oct 14, 2025
4854931
remove deprecated methods
zhangchiqing Oct 14, 2025
a19f958
update comments
zhangchiqing Oct 14, 2025
348284d
add tests for light transaction results store
zhangchiqing Oct 14, 2025
7636d0b
check locks held in tests
zhangchiqing Oct 14, 2025
e645c88
update optimistic sync core impl tests to verify locks held
zhangchiqing Oct 14, 2025
fa97bbb
Merge branch 'master' into leo/refactor-index-tx-err-msg
zhangchiqing Oct 15, 2025
94a08b5
fix mocks
zhangchiqing Oct 15, 2025
ee7a627
update tests
zhangchiqing Oct 15, 2025
c51b871
fix lint
zhangchiqing Oct 15, 2025
b7918aa
Merge branch 'master' into leo/refactor-index-tx-err-msg
zhangchiqing Oct 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion admin/commands/storage/backfill_tx_error_messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestBackfillTxErrorMessages(t *testing.T) {
func (suite *BackfillTxErrorMessagesSuite) SetupTest() {
suite.log = zerolog.New(os.Stderr)

lockManager := storage.NewTestingLockManager()
suite.state = new(protocolmock.State)
suite.headers = new(storagemock.Headers)
suite.receipts = new(storagemock.ExecutionReceipts)
Expand Down Expand Up @@ -160,6 +161,7 @@ func (suite *BackfillTxErrorMessagesSuite) SetupTest() {
errorMessageProvider,
suite.txErrorMessages,
executionNodeIdentitiesProvider,
lockManager,
)

suite.command = NewBackfillTxErrorMessagesCommand(
Expand Down Expand Up @@ -532,7 +534,7 @@ func (suite *BackfillTxErrorMessagesSuite) mockStoreTxErrorMessages(
}
}

suite.txErrorMessages.On("Store", blockID, txErrorMessages).Return(nil).Once()
suite.txErrorMessages.On("Store", mock.Anything, blockID, txErrorMessages).Return(nil).Once()
}

// assertAllExpectations asserts that all the expectations set on various mocks are met,
Expand Down
1 change: 1 addition & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2217,6 +2217,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
notNil(builder.txResultErrorMessageProvider),
builder.transactionResultErrorMessages,
notNil(builder.ExecNodeIdentitiesProvider),
node.StorageLockMgr,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/jordanschalm/lockctx"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/access/rpc/backend/transactions/error_messages"
Expand All @@ -24,6 +25,7 @@ type TxErrorMessagesCore struct {
txErrorMessageProvider error_messages.Provider
transactionResultErrorMessages storage.TransactionResultErrorMessages
execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider
lockManager storage.LockManager
}

// NewTxErrorMessagesCore creates a new instance of TxErrorMessagesCore.
Expand All @@ -32,12 +34,14 @@ func NewTxErrorMessagesCore(
txErrorMessageProvider error_messages.Provider,
transactionResultErrorMessages storage.TransactionResultErrorMessages,
execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider,
lockManager storage.LockManager,
) *TxErrorMessagesCore {
return &TxErrorMessagesCore{
log: log.With().Str("module", "tx_error_messages_core").Logger(),
txErrorMessageProvider: txErrorMessageProvider,
transactionResultErrorMessages: transactionResultErrorMessages,
execNodeIdentitiesProvider: execNodeIdentitiesProvider,
lockManager: lockManager,
}
}

Expand All @@ -62,6 +66,15 @@ func (c *TxErrorMessagesCore) FetchErrorMessages(ctx context.Context, blockID fl
return c.FetchErrorMessagesByENs(ctx, blockID, execNodes)
}

// FetchErrorMessagesByENs requests the transaction result error messages for the specified block ID from
// any of the given execution nodes and persists them once retrieved. This function blocks until ingesting
// the tx error messages is completed or failed.
//
// Note that transaction error messages are auxiliary data provided by the Execution Nodes on a goodwill basis and
// not protected by the protocol. Execution Error messages might be non-deterministic, i.e. potentially different
// for different execution nodes. Hence, we also persist which execution node (`execNode) provided the error message.
//
// It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist.
func (c *TxErrorMessagesCore) FetchErrorMessagesByENs(
ctx context.Context,
blockID flow.Identifier,
Expand All @@ -71,7 +84,6 @@ func (c *TxErrorMessagesCore) FetchErrorMessagesByENs(
if err != nil {
return fmt.Errorf("could not check existance of transaction result error messages: %w", err)
}

if exists {
return nil
}
Expand Down Expand Up @@ -100,14 +112,10 @@ func (c *TxErrorMessagesCore) FetchErrorMessagesByENs(
return nil
}

// storeTransactionResultErrorMessages stores the transaction result error messages for a given block ID.
// storeTransactionResultErrorMessages persists and indexes all transaction result error messages for the given blockID.
// The caller must acquire [storage.LockInsertTransactionResultErrMessage] and hold it until the write batch has been committed.
//
// Parameters:
// - blockID: The identifier of the block for which the error messages are to be stored.
// - errorMessagesResponses: A slice of responses containing the error messages to be stored.
// - execNode: The execution node associated with the error messages.
//
// No errors are expected during normal operation.
// It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist.
func (c *TxErrorMessagesCore) storeTransactionResultErrorMessages(
blockID flow.Identifier,
errorMessagesResponses []*execproto.GetTransactionErrorMessagesResponse_Result,
Expand All @@ -124,7 +132,9 @@ func (c *TxErrorMessagesCore) storeTransactionResultErrorMessages(
errorMessages = append(errorMessages, errorMessage)
}

err := c.transactionResultErrorMessages.Store(blockID, errorMessages)
err := storage.WithLock(c.lockManager, storage.LockInsertTransactionResultErrMessage, func(lctx lockctx.Context) error {
return c.transactionResultErrorMessages.Store(lctx, blockID, errorMessages)
})
if err != nil {
return fmt.Errorf("failed to store transaction error messages: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/jordanschalm/lockctx"
execproto "github.com/onflow/flow/protobuf/go/flow/execution"
"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
Expand All @@ -21,7 +22,8 @@ import (
"github.com/onflow/flow-go/module/irrecoverable"
syncmock "github.com/onflow/flow-go/module/state_synchronization/mock"
protocol "github.com/onflow/flow-go/state/protocol/mock"
storage "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/storage"
storagemock "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/utils/unittest"
)

Expand All @@ -37,13 +39,14 @@ type TxErrorMessagesCoreSuite struct {
params *protocol.Params
}

receipts *storage.ExecutionReceipts
txErrorMessages *storage.TransactionResultErrorMessages
lightTxResults *storage.LightTransactionResults
receipts *storagemock.ExecutionReceipts
txErrorMessages *storagemock.TransactionResultErrorMessages
lightTxResults *storagemock.LightTransactionResults

reporter *syncmock.IndexReporter
indexReporter *index.Reporter
txResultsIndex *index.TransactionResultsIndex
lockManager storage.LockManager

enNodeIDs flow.IdentityList
execClient *accessmock.ExecutionAPIClient
Expand Down Expand Up @@ -79,18 +82,21 @@ func (s *TxErrorMessagesCoreSuite) SetupTest() {
s.proto.params = protocol.NewParams(s.T())
s.execClient = accessmock.NewExecutionAPIClient(s.T())
s.connFactory = connectionmock.NewConnectionFactory(s.T())
s.receipts = storage.NewExecutionReceipts(s.T())
s.txErrorMessages = storage.NewTransactionResultErrorMessages(s.T())
s.receipts = storagemock.NewExecutionReceipts(s.T())
s.txErrorMessages = storagemock.NewTransactionResultErrorMessages(s.T())
s.rootBlock = unittest.Block.Genesis(flow.Emulator)
s.finalizedBlock = unittest.BlockWithParentFixture(s.rootBlock.ToHeader()).ToHeader()

s.lightTxResults = storage.NewLightTransactionResults(s.T())
s.lightTxResults = storagemock.NewLightTransactionResults(s.T())
s.reporter = syncmock.NewIndexReporter(s.T())
s.indexReporter = index.NewReporter()
err := s.indexReporter.Initialize(s.reporter)
s.Require().NoError(err)
s.txResultsIndex = index.NewTransactionResultsIndex(s.indexReporter, s.lightTxResults)

// Initialize lock manager for tests
s.lockManager = storage.NewTestingLockManager()

s.proto.state.On("Params").Return(s.proto.params)

// Mock the finalized root block header with height 0.
Expand Down Expand Up @@ -143,8 +149,11 @@ func (s *TxErrorMessagesCoreSuite) TestHandleTransactionResultErrorMessages() {
expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0])

// Mock the storage of the fetched error messages into the protocol database.
s.txErrorMessages.On("Store", blockId, expectedStoreTxErrorMessages).
Return(nil).Once()
s.txErrorMessages.On("Store", mock.Anything, blockId, expectedStoreTxErrorMessages).
Return(func(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error {
require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage))
return nil
}).Once()

core := s.initCore()
err := core.FetchErrorMessages(irrecoverableCtx, blockId)
Expand Down Expand Up @@ -228,8 +237,11 @@ func (s *TxErrorMessagesCoreSuite) TestHandleTransactionResultErrorMessages_Erro

// Simulate an error when attempting to store the fetched transaction error messages in storage.
expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0])
s.txErrorMessages.On("Store", blockId, expectedStoreTxErrorMessages).
Return(fmt.Errorf("storage error")).Once()
s.txErrorMessages.On("Store", mock.Anything, blockId, expectedStoreTxErrorMessages).
Return(func(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error {
require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage))
return fmt.Errorf("storage error")
}).Once()

core := s.initCore()
err := core.FetchErrorMessages(irrecoverableCtx, blockId)
Expand Down Expand Up @@ -268,6 +280,7 @@ func (s *TxErrorMessagesCoreSuite) initCore() *TxErrorMessagesCore {
errorMessageProvider,
s.txErrorMessages,
execNodeIdentitiesProvider,
s.lockManager,
)
return core
}
Expand Down Expand Up @@ -311,7 +324,7 @@ func mockTransactionResultsByBlock(count int) []flow.LightTransactionResult {

// setupReceiptsForBlock sets up mock execution receipts for a block and returns the receipts along
// with the identities of the execution nodes that processed them.
func setupReceiptsForBlock(receipts *storage.ExecutionReceipts, block *flow.Block, eNodeID flow.Identifier) {
func setupReceiptsForBlock(receipts *storagemock.ExecutionReceipts, block *flow.Block, eNodeID flow.Identifier) {
receipt1 := unittest.ReceiptForBlockFixture(block)
receipt1.ExecutorID = eNodeID
receipt2 := unittest.ReceiptForBlockFixture(block)
Expand All @@ -328,7 +341,7 @@ func setupReceiptsForBlock(receipts *storage.ExecutionReceipts, block *flow.Bloc
}

// setupReceiptsForBlockWithResult sets up mock execution receipts for a block with a specific execution result
func setupReceiptsForBlockWithResult(receipts *storage.ExecutionReceipts, executionResult *flow.ExecutionResult, executorIDs ...flow.Identifier) {
func setupReceiptsForBlockWithResult(receipts *storagemock.ExecutionReceipts, executionResult *flow.ExecutionResult, executorIDs ...flow.Identifier) {
receiptList := make(flow.ExecutionReceiptList, 0, len(executorIDs))
for _, enID := range executorIDs {
receiptList = append(receiptList, unittest.ExecutionReceiptFixture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/jordanschalm/lockctx"
execproto "github.com/onflow/flow/protobuf/go/flow/execution"
"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -55,6 +56,7 @@ type TxErrorMessagesEngineSuite struct {
reporter *syncmock.IndexReporter
indexReporter *index.Reporter
txResultsIndex *index.TransactionResultsIndex
lockManager storage.LockManager

enNodeIDs flow.IdentityList
execClient *accessmock.ExecutionAPIClient
Expand Down Expand Up @@ -87,9 +89,13 @@ func (s *TxErrorMessagesEngineSuite) SetupTest() {
s.log = unittest.Logger()
s.metrics = metrics.NewNoopCollector()
s.ctx, s.cancel = context.WithCancel(context.Background())

// Initialize database and lock manager
pdb, dbDir := unittest.TempPebbleDB(s.T())
s.db = pebbleimpl.ToDB(pdb)
s.dbDir = dbDir
s.lockManager = storage.NewTestingLockManager()

// mock out protocol state
s.proto.state = protocol.NewFollowerState(s.T())
s.proto.snapshot = protocol.NewSnapshot(s.T())
Expand Down Expand Up @@ -177,6 +183,7 @@ func (s *TxErrorMessagesEngineSuite) initEngine(ctx irrecoverable.SignalerContex
errorMessageProvider,
s.txErrorMessages,
execNodeIdentitiesProvider,
s.lockManager,
)

eng, err := New(
Expand Down Expand Up @@ -245,10 +252,12 @@ func (s *TxErrorMessagesEngineSuite) TestOnFinalizedBlockHandleTxErrorMessages()
expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0])

// Mock the storage of the fetched error messages into the protocol database.
s.txErrorMessages.On("Store", blockID, expectedStoreTxErrorMessages).Return(nil).
s.txErrorMessages.On("Store", mock.Anything, blockID, expectedStoreTxErrorMessages).Return(nil).
Run(func(args mock.Arguments) {
// Ensure the test does not complete its work faster than necessary
wg.Done()
lctx, ok := args[0].(lockctx.Proof)
require.True(s.T(), ok, "expecting lock proof, but cast failed")
require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage))
wg.Done() // Ensure the test does not complete its work faster than necessary
}).Once()
}

Expand Down
2 changes: 1 addition & 1 deletion module/executiondatasync/optimistic_sync/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (c *CoreImpl) Persist() error {
stores.NewEventsStore(indexerData.Events, c.workingData.persistentEvents, c.executionResult.BlockID),
stores.NewResultsStore(indexerData.Results, c.workingData.persistentResults, c.executionResult.BlockID),
stores.NewCollectionsStore(indexerData.Collections, c.workingData.persistentCollections),
stores.NewTxResultErrMsgStore(c.workingData.txResultErrMsgsData, c.workingData.persistentTxResultErrMsgs, c.executionResult.BlockID),
stores.NewTxResultErrMsgStore(c.workingData.txResultErrMsgsData, c.workingData.persistentTxResultErrMsgs, c.executionResult.BlockID, c.workingData.lockManager),
stores.NewLatestSealedResultStore(c.workingData.latestPersistedSealedResult, c.executionResult.ID(), c.block.Height),
}
blockPersister := persisters.NewBlockPersister(
Expand Down
19 changes: 16 additions & 3 deletions module/executiondatasync/optimistic_sync/core_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/jordanschalm/lockctx"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

Expand Down Expand Up @@ -446,9 +447,21 @@ func (c *CoreImplSuite) TestCoreImpl_Persist() {
indexerData := core.workingData.indexerData
c.persistentRegisters.On("Store", flow.RegisterEntries(indexerData.Registers), tf.block.Height).Return(nil)
c.persistentEvents.On("BatchStore", blockID, []flow.EventsList{indexerData.Events}, mock.Anything).Return(nil)
c.persistentCollections.On("BatchStoreAndIndexByTransaction", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
c.persistentResults.On("BatchStore", blockID, indexerData.Results, mock.Anything).Return(nil)
c.persistentTxResultErrMsg.On("BatchStore", blockID, core.workingData.txResultErrMsgsData, mock.Anything).Return(nil)
c.persistentCollections.On("BatchStoreAndIndexByTransaction",
mock.MatchedBy(func(lctx lockctx.Proof) bool {
return lctx.HoldsLock(storage.LockInsertCollection)
}),
mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
c.persistentResults.On("BatchStore",
mock.MatchedBy(func(lctx lockctx.Proof) bool {
return lctx.HoldsLock(storage.LockInsertLightTransactionResult)
}),
mock.Anything, blockID, indexerData.Results).Return(nil)
c.persistentTxResultErrMsg.On("BatchStore",
mock.MatchedBy(func(lctx lockctx.Proof) bool {
return lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage)
}),
mock.Anything, blockID, core.workingData.txResultErrMsgsData).Return(nil)
c.latestPersistedSealedResult.On("BatchSet", tf.exeResult.ID(), tf.block.Height, mock.Anything).Return(nil)

err = core.Persist()
Expand Down
25 changes: 12 additions & 13 deletions module/executiondatasync/optimistic_sync/persisters/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,19 @@ func (p *BlockPersister) Persist() error {
p.log.Debug().Msg("started to persist execution data")
start := time.Now()

lctx := p.lockManager.NewContext()
err := lctx.AcquireLock(storage.LockInsertCollection)
if err != nil {
return fmt.Errorf("could not acquire lock for inserting light collections: %w", err)
}
defer lctx.Release()

err = p.protocolDB.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error {
for _, persister := range p.persisterStores {
if err := persister.Persist(lctx, batch); err != nil {
return err
err := storage.WithLocks(p.lockManager, []string{
storage.LockInsertCollection,
storage.LockInsertLightTransactionResult,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this usage here, that the block persister has to know what locks needs to be acquired. The block persister is supposed to be ignorant about what db operation the underlying individual persisters running, therefore doesn't know about the locks to acquire, it supposed to only create a batch object, and ensure it's committed.

Maybe we can revisit this, when working on #7910. My idea is that we could let the BatchStore functor to return a required lock ID and the functor, so that the locker doesn't need to remember what lock to acquire.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's slightly awkward, but I think returning the expected lock alongside the functor could cause other problems:

  • what if many functors return the same lock to the ignorant caller? The caller would need to be able to ensure it acquires each lock only once
  • if there are many different locks to acquire, we still need to carefully order persisterStores to make sure the locks are acquired in the right order

Fundamentally, the layer at which locks are acquired does need to know something about what locks are being acquired. I think just having the upper layer explicitly acquire the needed locks is the simplest way to deal with this pattern.

Copy link
Member

@AlexHentschel AlexHentschel Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Leo] I don't like this usage here, that the block persister has to know what locks needs to be acquired.

my 10 cents on this conversation:

  • On the one hand, acquiring the locks one by one looks verbose. But I don't think this is a problem of the lock proof pattern. Here are my reasonings:

    • The current BlockPersister implementation already very precisely documents that is is for persisting an execution result

      // BlockPersister stores execution data for a single execution result into the database.
      Therefore, it is nothing surprising that BlockPersister also has to know which locks to acquire (all that requires for persisting a result)

    • With moving to pebble, the lower-level storage layer has not longer the ability to self-sufficiently protect against illegal data changes. Now, the business logic has be be involved by acquiring and holding locks.

      Components that require different locks simply don't satisfy the same API anymore. Higher-level business logic has to be aware which locks to acquire, that's simply a consequence of the storage layer no longer having snapshot isolation for read + writes.

  • From my perspective, you can try to hide the reality that the type of lock that must be held is conceptually part of the interface (even through the compiler only enforces that some lock proof is given, but is oblivious about the lock). I think thereby you inadvertently create problems in other parts that then no longer have the information they need:

    • order of locks cannot be guaranteed if no single party is responsible for acquiring the locks in one place
    • more complicated implementation of checks, when locks are no longer required to be held at the time of call, but only at the time the batch is committed

storage.LockInsertTransactionResultErrMessage,
}, func(lctx lockctx.Context) error {
return p.protocolDB.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error {
for _, persister := range p.persisterStores {
if err := persister.Persist(lctx, batch); err != nil {
return err
}
}
}
return nil
return nil
})
})

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (p *PersisterSuite) testWithDatabase() {
stores.NewEventsStore(p.indexerData.Events, events, p.executionResult.BlockID),
stores.NewResultsStore(p.indexerData.Results, results, p.executionResult.BlockID),
stores.NewCollectionsStore(p.indexerData.Collections, collections),
stores.NewTxResultErrMsgStore(p.txErrMsgs, txResultErrMsg, p.executionResult.BlockID),
stores.NewTxResultErrMsgStore(p.txErrMsgs, txResultErrMsg, p.executionResult.BlockID, lockManager),
stores.NewLatestSealedResultStore(latestPersistedSealedResult, p.executionResult.ID(), p.header.Height),
},
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stores

import (
"errors"
"fmt"

"github.com/jordanschalm/lockctx"
Expand Down Expand Up @@ -34,8 +35,16 @@ func NewEventsStore(
//
// No error returns are expected during normal operations
func (e *EventsStore) Persist(_ lockctx.Proof, batch storage.ReaderBatchWriter) error {
if err := e.persistedEvents.BatchStore(e.blockID, []flow.EventsList{e.data}, batch); err != nil {
err := e.persistedEvents.BatchStore(e.blockID, []flow.EventsList{e.data}, batch)
if err != nil {
if errors.Is(err, storage.ErrAlreadyExists) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question

I assume this is anticipating future changes? If I am not mistaken, at the moment, Events.BatchStore does not error. It just ignorantly overwrites the data.

In addition, the storage API is lacking error documentation, so added the following todo:

// BatchStore will store events for the given block ID in a given batch
// TODO: error documentation
BatchStore(blockID flow.Identifier, events []flow.EventsList, batch ReaderBatchWriter) error

Overall, I think we need to come back to the events and add overwrite protection checks. After adding these checks, the code in the EventsStore here will probably be correct. Created issue #8034 ... maybe worth-while to link the issue here in the code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the refactor of the implementation has been addressed in this PR:
https://github.com/onflow/flow-go/pull/8005/files#diff-db1f3e68113391168d7f6bf516cfc5b9c2ac5c0d0f106751171f48b4da7d7678R19

So the error handling is added here first.

// CAUTION: here we assume that if something is already stored for our blockID, then the data is identical.
// This only holds true for sealed execution results, whose consistency has previously been verified by
// comparing the data's hash to commitments in the execution result.
return nil
}
return fmt.Errorf("could not add events to batch: %w", err)
}

return nil
}
Loading
Loading