Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
56789b8
add lockctx to save execution results operations
zhangchiqing Sep 24, 2025
5f38c5c
fix tests
zhangchiqing Sep 24, 2025
ae157d1
Fix import formatting in storage/operation files
zhangchiqing Sep 24, 2025
11ebaba
update mocks
zhangchiqing Sep 24, 2025
cf8fbc8
refactor indexing results
zhangchiqing Sep 24, 2025
e069fd9
adding lock to index events and light transaction results
zhangchiqing Sep 24, 2025
0c4785d
refactor tests
zhangchiqing Sep 24, 2025
4c35202
fix optimistic_sync persisters
zhangchiqing Sep 25, 2025
1c7271f
refactor IndexOwnExecutionResult
zhangchiqing Sep 25, 2025
89688a2
update mocks
zhangchiqing Sep 25, 2025
3b87400
fix inmemory stores
zhangchiqing Oct 3, 2025
87f056a
fix in memory indexer
zhangchiqing Oct 3, 2025
81ec9d3
add lock manager to an ingestion engine
zhangchiqing Oct 3, 2025
f85f951
fix access test
zhangchiqing Oct 3, 2025
060f438
fix tests for BatchIndex
zhangchiqing Oct 3, 2025
14dcb42
fix tests
zhangchiqing Oct 3, 2025
a731f69
fix linter
zhangchiqing Oct 3, 2025
726eaec
fix tests
zhangchiqing Oct 3, 2025
0820d94
remove results.Store replace with BatchStore
zhangchiqing Oct 3, 2025
e1c6cc7
refactor InsertResult
zhangchiqing Oct 3, 2025
9855b1e
fix mocks
zhangchiqing Oct 3, 2025
45dc92a
fix badger state
zhangchiqing Oct 3, 2025
ce2156e
fix tests
zhangchiqing Oct 3, 2025
32f653d
refactor BatchIndexBlockContainingCollectionGuarantees
zhangchiqing Oct 4, 2025
f47041d
use different lock id
zhangchiqing Oct 4, 2025
77127b3
Merge branch 'master' into leo/refactor-index-result
zhangchiqing Oct 4, 2025
bb10b84
update mocks
zhangchiqing Oct 4, 2025
0826270
fix execution tests
zhangchiqing Oct 6, 2025
42a4620
fix optimisic sync persister tests
zhangchiqing Oct 6, 2025
25c93b8
fix execution engine tests
zhangchiqing Oct 6, 2025
7ceae7d
fix access ingestion engine tests
zhangchiqing Oct 6, 2025
467cc95
fix lint and test cases
zhangchiqing Oct 6, 2025
1b1d0d9
fix mocks
zhangchiqing Oct 6, 2025
0cb7f08
fix indexer tests
zhangchiqing Oct 6, 2025
2b061bf
move context.Release
zhangchiqing Oct 6, 2025
3120db3
rename IndexOwnExecutionResult to IndexOwnOrSealedExecutionResult
zhangchiqing Oct 6, 2025
b620fec
add comments
zhangchiqing Oct 6, 2025
a83ff30
fix lock policy
zhangchiqing Oct 7, 2025
2cc0f55
change lock order
zhangchiqing Oct 7, 2025
c24a29c
fix builder test
zhangchiqing Oct 7, 2025
27bdbdc
refactor locks
zhangchiqing Oct 7, 2025
6fa63e7
fix tests
zhangchiqing Oct 7, 2025
08d0749
fix lint
zhangchiqing Oct 7, 2025
096e441
update lock
zhangchiqing Oct 7, 2025
4595bea
update tests
zhangchiqing Oct 7, 2025
0d166a9
fix execution state extraction test
zhangchiqing Oct 7, 2025
84384b6
remove LockIndexFinalizedBlock
zhangchiqing Oct 7, 2025
f802992
remove deprecated methods
zhangchiqing Oct 7, 2025
1379487
add consistency check for InsertAndIndexTransactionResults
zhangchiqing Oct 7, 2025
5213db5
handle already exists error
zhangchiqing Oct 7, 2025
5fe29ef
skip already exist error
zhangchiqing Oct 7, 2025
e0f954e
update mocks
zhangchiqing Oct 7, 2025
2359222
add test case
zhangchiqing Oct 7, 2025
bb43d95
add test to check lock is hold
zhangchiqing Oct 7, 2025
80bd5a6
add comments
zhangchiqing Oct 8, 2025
e66a0f4
refactor InsertEvent with InsertBlockEvents
zhangchiqing Oct 8, 2025
bc19eb6
Merge remote-tracking branch 'origin/leo/refactor-index-result' into …
zhangchiqing Oct 8, 2025
a458240
Merge branch 'master' into leo/refactor-index-result
zhangchiqing Oct 8, 2025
b87016a
fix core impl test
zhangchiqing Oct 8, 2025
f4cb249
refactor to use LockInsertServiceEvent
zhangchiqing Oct 8, 2025
a5e1096
remove events.Store method
zhangchiqing Oct 8, 2025
9f8690f
add LockIndexExecutionResult
zhangchiqing Oct 8, 2025
951967f
fix test
zhangchiqing Oct 8, 2025
4821ca3
fix execution state tests
zhangchiqing Oct 8, 2025
c458be6
remove outdated comments
zhangchiqing Oct 8, 2025
8e2be8a
add test case
zhangchiqing Oct 8, 2025
90ef8d5
update comments
zhangchiqing Oct 8, 2025
fe33e7f
update comments
zhangchiqing Oct 8, 2025
1cadc17
update lock policy
zhangchiqing Oct 8, 2025
9dafecc
add comments and remove unused methods
zhangchiqing Oct 8, 2025
03c159b
add existence check when inserting events
zhangchiqing Oct 8, 2025
7eca395
fix results tests
zhangchiqing Oct 8, 2025
7313207
update events tests
zhangchiqing Oct 8, 2025
f863089
fix tests
zhangchiqing Oct 8, 2025
bf3187d
fix persist block test
zhangchiqing Oct 8, 2025
e7e4059
fix lint
zhangchiqing Oct 8, 2025
d725fed
add comment to lock policy
zhangchiqing Oct 9, 2025
a0a2985
rename LockBootstrapping to LockInsertInstanceParams
zhangchiqing Oct 9, 2025
c2c67aa
Merge branch 'master' into leo/refactor-index-result
zhangchiqing Oct 15, 2025
41a54b8
fix lint
zhangchiqing Oct 15, 2025
a35a911
remove LockBootstrapping
zhangchiqing Oct 15, 2025
6dd2401
fix tests and locks policy
zhangchiqing Oct 15, 2025
6a91a9e
fix tests
zhangchiqing Oct 15, 2025
87758a9
refactor lock policys
zhangchiqing Oct 16, 2025
04414c1
Merge branch 'master' into leo/refactor-index-result
zhangchiqing Oct 16, 2025
bcf1655
Apply suggestions from code review
zhangchiqing Oct 17, 2025
65c3ceb
rename to LockInsertMyReceipt
zhangchiqing Oct 17, 2025
8ece701
rename to LockIndexBlockByPayloadGuarantees
zhangchiqing Oct 17, 2025
55e86da
rename variables in BatchIndexBlockContainingCollectionGuarantees
zhangchiqing Oct 17, 2025
5bd5581
fix lint
zhangchiqing Oct 17, 2025
50b4ba4
rename results operations
zhangchiqing Oct 17, 2025
8d8d964
add comments
zhangchiqing Oct 17, 2025
4800c80
Merge branch 'master' into leo/refactor-index-result
zhangchiqing Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2239,6 +2239,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.EngineRegistry,
node.State,
node.Me,
node.StorageLockMgr,
node.ProtocolDB,
node.Storage.Blocks,
node.Storage.Results,
node.Storage.Receipts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestExtractExecutionState(t *testing.T) {
blockID := unittest.IdentifierFixture()
stateCommitment := unittest.StateCommitmentFixture()

err := unittest.WithLock(t, lockManager, storage.LockInsertOwnReceipt, func(lctx lockctx.Context) error {
err := unittest.WithLock(t, lockManager, storage.LockIndexStateCommitment, func(lctx lockctx.Context) error {
Copy link
Member Author

@zhangchiqing zhangchiqing Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of reusing the same lock, I think we should acquire a lock that is specific to the data we are updating. That's why I changed to this new lock.

return storageDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
// Store the state commitment for the block ID
return operation.IndexStateCommitment(lctx, rw, blockID, stateCommitment)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestExtractExecutionState(t *testing.T) {
// generate random block and map it to state commitment
blockID := unittest.IdentifierFixture()

err = unittest.WithLock(t, lockManager, storage.LockInsertOwnReceipt, func(lctx lockctx.Context) error {
err = unittest.WithLock(t, lockManager, storage.LockIndexStateCommitment, func(lctx lockctx.Context) error {
return storageDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return operation.IndexStateCommitment(lctx, rw, blockID, flow.StateCommitment(stateCommitment))
})
Expand Down
56 changes: 0 additions & 56 deletions cmd/util/cmd/reindex/cmd/results.go

This file was deleted.

40 changes: 0 additions & 40 deletions cmd/util/cmd/reindex/cmd/root.go

This file was deleted.

7 changes: 0 additions & 7 deletions cmd/util/cmd/reindex/main.go

This file was deleted.

2 changes: 0 additions & 2 deletions cmd/util/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
read_execution_state "github.com/onflow/flow-go/cmd/util/cmd/read-execution-state"
read_hotstuff "github.com/onflow/flow-go/cmd/util/cmd/read-hotstuff/cmd"
read_protocol_state "github.com/onflow/flow-go/cmd/util/cmd/read-protocol-state/cmd"
index_er "github.com/onflow/flow-go/cmd/util/cmd/reindex/cmd"
rollback_executed_height "github.com/onflow/flow-go/cmd/util/cmd/rollback-executed-height/cmd"
run_script "github.com/onflow/flow-go/cmd/util/cmd/run-script"
"github.com/onflow/flow-go/cmd/util/cmd/snapshot"
Expand Down Expand Up @@ -111,7 +110,6 @@ func addCommands() {
rootCmd.AddCommand(leaders.Cmd)
rootCmd.AddCommand(epochs.RootCmd)
rootCmd.AddCommand(edbs.RootCmd)
rootCmd.AddCommand(index_er.RootCmd)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tool is no longer useful since this PR. Because now we don't allow multiple results to be stored for the same block, therefore we don't have multiple results to choose from which one to be reindexed with.

rootCmd.AddCommand(rollback_executed_height.Cmd)
rootCmd.AddCommand(read_execution_state.Cmd)
rootCmd.AddCommand(snapshot.Cmd)
Expand Down
22 changes: 20 additions & 2 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ func (suite *Suite) TestGetBlockByIDAndHeight() {

func (suite *Suite) TestGetExecutionResultByBlockID() {
suite.RunTest(func(handler *rpc.Handler, db storage.DB, all *store.All) {
lockManager := storage.NewTestingLockManager()

// test block1 get by ID
nonexistingID := unittest.IdentifierFixture()
Expand All @@ -553,8 +554,16 @@ func (suite *Suite) TestGetExecutionResultByBlockID() {
unittest.WithExecutionResultBlockID(blockID),
unittest.WithServiceEvents(3))

require.NoError(suite.T(), all.Results.Store(er))
require.NoError(suite.T(), all.Results.Index(blockID, er.ID()))
require.NoError(suite.T(), storage.WithLock(lockManager, storage.LockIndexExecutionResult, func(lctx lockctx.Context) error {
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
err := all.Results.BatchStore(er, rw)
if err != nil {
return err
}
// requires LockIndexExecutionResult
return all.Results.BatchIndex(lctx, rw, blockID, er.ID())
})
}))

assertResp := func(
resp *accessproto.ExecutionResultForBlockIDResponse,
Expand Down Expand Up @@ -626,6 +635,7 @@ func (suite *Suite) TestGetExecutionResultByBlockID() {
// is reported as sealed
func (suite *Suite) TestGetSealedTransaction() {
unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) {
lockManager := storage.NewTestingLockManager()
db := pebbleimpl.ToDB(pdb)
all := store.InitAll(metrics.NewNoopCollector(), db)
enIdentities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
Expand Down Expand Up @@ -740,6 +750,8 @@ func (suite *Suite) TestGetSealedTransaction() {
suite.net,
suite.state,
suite.me,
lockManager,
db,
all.Blocks,
all.Results,
all.Receipts,
Expand Down Expand Up @@ -816,6 +828,7 @@ func (suite *Suite) TestGetSealedTransaction() {
// transaction ID, block ID, and collection ID.
func (suite *Suite) TestGetTransactionResult() {
unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) {
lockManager := storage.NewTestingLockManager()
db := pebbleimpl.ToDB(pdb)
all := store.InitAll(metrics.NewNoopCollector(), db)
originID := unittest.IdentifierFixture()
Expand Down Expand Up @@ -963,6 +976,8 @@ func (suite *Suite) TestGetTransactionResult() {
suite.net,
suite.state,
suite.me,
lockManager,
db,
all.Blocks,
all.Results,
all.Receipts,
Expand Down Expand Up @@ -1135,6 +1150,7 @@ func (suite *Suite) TestGetTransactionResult() {
// the correct block id
func (suite *Suite) TestExecuteScript() {
unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) {
lockManager := storage.NewTestingLockManager()
db := pebbleimpl.ToDB(pdb)
all := store.InitAll(metrics.NewNoopCollector(), db)
identities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
Expand Down Expand Up @@ -1229,6 +1245,8 @@ func (suite *Suite) TestExecuteScript() {
suite.net,
suite.state,
suite.me,
lockManager,
db,
all.Blocks,
all.Results,
all.Receipts,
Expand Down
35 changes: 26 additions & 9 deletions engine/access/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/jordanschalm/lockctx"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/consensus/hotstuff/model"
Expand Down Expand Up @@ -78,6 +79,8 @@ type Engine struct {

// storage
// FIX: remove direct DB access by substituting indexer module
db storage.DB
lockManager storage.LockManager
blocks storage.Blocks
executionReceipts storage.ExecutionReceipts
maxReceiptHeight uint64
Expand All @@ -101,6 +104,8 @@ func New(
net network.EngineRegistry,
state protocol.State,
me module.Local,
lockManager storage.LockManager,
db storage.DB,
blocks storage.Blocks,
executionResults storage.ExecutionResults,
executionReceipts storage.ExecutionReceipts,
Expand Down Expand Up @@ -133,6 +138,8 @@ func New(
log: log.With().Str("engine", "ingestion").Logger(),
state: state,
me: me,
lockManager: lockManager,
db: db,
blocks: blocks,
executionResults: executionResults,
executionReceipts: executionReceipts,
Expand Down Expand Up @@ -372,19 +379,29 @@ func (e *Engine) processFinalizedBlock(block *flow.Block) error {
// TODO: substitute an indexer module as layer between engine and storage

// index the block storage with each of the collection guarantee
err := e.blocks.IndexBlockContainingCollectionGuarantees(block.ID(), flow.GetIDs(block.Payload.Guarantees))
err := storage.WithLocks(e.lockManager, storage.LockGroupAccessFinalizingBlock, func(lctx lockctx.Context) error {
return e.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
// requires [storage.LockIndexBlockByPayloadGuarantees] lock
err := e.blocks.BatchIndexBlockContainingCollectionGuarantees(lctx, rw, block.ID(), flow.GetIDs(block.Payload.Guarantees))
if err != nil {
return fmt.Errorf("could not index block for collections: %w", err)
}

// loop through seals and index ID -> result ID
for _, seal := range block.Payload.Seals {
// requires [storage.LockIndexExecutionResult] lock
err := e.executionResults.BatchIndex(lctx, rw, seal.BlockID, seal.ResultID)
if err != nil {
return fmt.Errorf("could not index block for execution result: %w", err)
}
}
return nil
})
})
if err != nil {
return fmt.Errorf("could not index block for collections: %w", err)
}

// loop through seals and index ID -> result ID
for _, seal := range block.Payload.Seals {
err := e.executionResults.Index(seal.BlockID, seal.ResultID)
if err != nil {
return fmt.Errorf("could not index block for execution result: %w", err)
}
}

e.collectionSyncer.RequestCollectionsForBlock(block.Height, block.Payload.Guarantees)
e.collectionExecutedMetric.BlockFinalized(block)

Expand Down
13 changes: 8 additions & 5 deletions engine/access/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (s *Suite) SetupTest() {
s.receipts = new(storagemock.ExecutionReceipts)
s.transactions = new(storagemock.Transactions)
s.results = new(storagemock.ExecutionResults)
s.results.On("BatchIndex", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
collectionsToMarkFinalized := stdmap.NewTimes(100)
collectionsToMarkExecuted := stdmap.NewTimes(100)
blocksToMarkExecuted := stdmap.NewTimes(100)
Expand Down Expand Up @@ -213,6 +214,8 @@ func (s *Suite) initEngineAndSyncer(ctx irrecoverable.SignalerContext) (*Engine,
s.net,
s.proto.state,
s.me,
s.lockManager,
s.db,
s.blocks,
s.results,
s.receipts,
Expand Down Expand Up @@ -294,7 +297,7 @@ func (s *Suite) TestOnFinalizedBlockSingle() {
}

// expect that the block storage is indexed with each of the collection guarantee
s.blocks.On("IndexBlockContainingCollectionGuarantees", block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).Return(nil).Once()
s.blocks.On("BatchIndexBlockContainingCollectionGuarantees", mock.Anything, mock.Anything, block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).Return(nil).Once()
for _, seal := range block.Payload.Seals {
s.results.On("Index", seal.BlockID, seal.ResultID).Return(nil).Once()
}
Expand All @@ -321,7 +324,7 @@ func (s *Suite) TestOnFinalizedBlockSingle() {
// assert that the block was retrieved and all collections were requested
s.headers.AssertExpectations(s.T())
s.request.AssertNumberOfCalls(s.T(), "EntityByID", len(block.Payload.Guarantees))
s.results.AssertNumberOfCalls(s.T(), "Index", len(block.Payload.Seals))
s.results.AssertNumberOfCalls(s.T(), "BatchIndex", len(block.Payload.Seals))
}

// TestOnFinalizedBlockSeveralBlocksAhead checks OnFinalizedBlock with a block several blocks newer than the last block processed
Expand Down Expand Up @@ -370,7 +373,7 @@ func (s *Suite) TestOnFinalizedBlockSeveralBlocksAhead() {

// expected all new blocks after last block processed
for _, block := range blocks {
s.blocks.On("IndexBlockContainingCollectionGuarantees", block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).Return(nil).Once()
s.blocks.On("BatchIndexBlockContainingCollectionGuarantees", mock.Anything, mock.Anything, block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).Return(nil).Once()

for _, cg := range block.Payload.Guarantees {
s.request.On("EntityByID", cg.CollectionID, mock.Anything).Return().Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -398,9 +401,9 @@ func (s *Suite) TestOnFinalizedBlockSeveralBlocksAhead() {
}

s.headers.AssertExpectations(s.T())
s.blocks.AssertNumberOfCalls(s.T(), "IndexBlockContainingCollectionGuarantees", newBlocksCount)
s.blocks.AssertNumberOfCalls(s.T(), "BatchIndexBlockContainingCollectionGuarantees", newBlocksCount)
s.request.AssertNumberOfCalls(s.T(), "EntityByID", expectedEntityByIDCalls)
s.results.AssertNumberOfCalls(s.T(), "Index", expectedIndexCalls)
s.results.AssertNumberOfCalls(s.T(), "BatchIndex", expectedIndexCalls)
}

// TestOnCollection checks that when a Collection is received, it is persisted
Expand Down
2 changes: 1 addition & 1 deletion engine/access/ingestion2/collection_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ func (s *CollectionSyncer) StartWorkerLoop(ctx irrecoverable.SignalerContext, re

// Create a lock context for indexing
lctx := s.lockManager.NewContext()
defer lctx.Release()
err := lctx.AcquireLock(storage.LockInsertCollection)
if err != nil {
ctx.Throw(fmt.Errorf("could not acquire lock for collection indexing: %w", err))
return
}
defer lctx.Release()
Copy link
Member Author

@zhangchiqing zhangchiqing Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mistake like this will happen, in order to avoid this, I will create a separate PR to replace all AcquireLock usages with storage.WithLock


err = indexer.IndexCollection(lctx, collection, s.collections, s.logger, s.collectionExecutedMetric)
if err != nil {
Expand Down
Loading
Loading