Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 39 additions & 10 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,8 @@

func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgressInitializer
var processedNotifications storage.ConsumerProgressInitializer
var processedBlockHeightInitializer storage.ConsumerProgressInitializer
var processedNotificationsInitializer storage.ConsumerProgressInitializer
var bsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -604,14 +604,14 @@
// writes execution data to.
db := builder.ExecutionDatastoreManager.DB()

processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeightInitializer = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
db := builder.ExecutionDatastoreManager.DB()
processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
processedNotificationsInitializer = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
return nil
}).
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -747,6 +747,16 @@
execDataCacheBackend,
)

// start processing from the initial block height resolved from the execution data config
processedBlockHeight, err := processedBlockHeightInitializer.Initialize(builder.executionDataConfig.InitialBlockHeight)
if err != nil {
return nil, fmt.Errorf("could not initialize processed block height: %w", err)
}
processedNotifications, err := processedNotificationsInitializer.Initialize(builder.executionDataConfig.InitialBlockHeight)
if err != nil {
return nil, fmt.Errorf("could not initialize processed notifications: %w", err)
}

r, err := edrequester.New(
builder.Logger,
metrics.NewExecutionDataRequesterCollector(),
Expand Down Expand Up @@ -838,15 +848,15 @@
}

if builder.executionDataIndexingEnabled {
var indexedBlockHeight storage.ConsumerProgressInitializer
var indexedBlockHeightInitializer storage.ConsumerProgressInitializer

builder.
AdminCommand("execute-script", func(config *cmd.NodeConfig) commands.AdminCommand {
return stateSyncCommands.NewExecuteScriptCommand(builder.ScriptExecutor)
}).
Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeightInitializer = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -959,6 +969,13 @@
node.StorageLockMgr,
)

// start processing from the first height of the registers db, which is initialized from
// the checkpoint. this ensures a consistent starting point for the indexed data.
indexedBlockHeight, err := indexedBlockHeightInitializer.Initialize(registers.FirstHeight())
if err != nil {
return nil, fmt.Errorf("could not initialize indexed block height: %w", err)
}

// execution state worker uses a jobqueue to process new execution data and indexes it by using the indexer.
builder.ExecutionIndexer, err = indexer.NewIndexer(
builder.Logger,
Expand Down Expand Up @@ -1719,8 +1736,8 @@
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedFinalizedBlockHeight storage.ConsumerProgressInitializer
var processedTxErrorMessagesBlockHeight storage.ConsumerProgressInitializer
var processedFinalizedBlockHeightInitializer storage.ConsumerProgressInitializer
var processedTxErrorMessagesBlockHeightInitializer storage.ConsumerProgressInitializer

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
Expand Down Expand Up @@ -1945,7 +1962,7 @@
return nil
}).
Module("processed finalized block height consumer progress", func(node *cmd.NodeConfig) error {
processedFinalizedBlockHeight = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressIngestionEngineBlockHeight)
processedFinalizedBlockHeightInitializer = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressIngestionEngineBlockHeight)
return nil
}).
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -2233,6 +2250,12 @@
)
builder.RequestEng.WithHandle(collectionSyncer.OnCollectionDownloaded)

// start ingesting finalized block from the earliest block in storage (sealed root block height)
processedFinalizedBlockHeight, err := processedFinalizedBlockHeightInitializer.Initialize(node.SealedRootBlock.Height)
if err != nil {
return nil, fmt.Errorf("could not initialize processed finalized block height: %w", err)
}

builder.IngestEng, err = ingestion.New(
node.Logger,
node.EngineRegistry,
Expand Down Expand Up @@ -2279,13 +2302,19 @@
return nil
}).
Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
processedTxErrorMessagesBlockHeight = store.NewConsumerProgress(
processedTxErrorMessagesBlockHeightInitializer = store.NewConsumerProgress(
builder.ProtocolDB,
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
)
return nil
}).
Component("transaction result error messages engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// start processing from the earliest block in storage (sealed root block height)
processedTxErrorMessagesBlockHeight, err := processedTxErrorMessagesBlockHeightInitializer.Initialize(node.SealedRootBlock.Height)
if err != nil {
return nil, fmt.Errorf("could not initialize processed tx error messages block height: %w", err)
}

Check failure on line 2317 in cmd/access/node_builder/access_node_builder.go

View workflow job for this annotation

GitHub Actions / Lint (./)

File is not properly formatted (goimports)
engine, err := tx_error_messages.New(
node.Logger,
metrics.NewTransactionErrorMessagesCollector(),
Expand Down
29 changes: 23 additions & 6 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,8 +1102,8 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) {
func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder {
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgressInitializer
var processedNotifications storage.ConsumerProgressInitializer
var processedBlockHeightInitializer storage.ConsumerProgressInitializer
var processedNotificationsInitializer storage.ConsumerProgressInitializer
var publicBsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -1148,15 +1148,15 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
// writes execution data to.
db := builder.ExecutionDatastoreManager.DB()

processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeightInitializer = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
db := builder.ExecutionDatastoreManager.DB()

processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
processedNotificationsInitializer = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
return nil
}).
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1285,6 +1285,16 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
execDataCacheBackend,
)

// start processing from the initial block height resolved from the execution data config
processedBlockHeight, err := processedBlockHeightInitializer.Initialize(builder.executionDataConfig.InitialBlockHeight)
if err != nil {
return nil, fmt.Errorf("could not initialize processed block height: %w", err)
}
processedNotifications, err := processedNotificationsInitializer.Initialize(builder.executionDataConfig.InitialBlockHeight)
if err != nil {
return nil, fmt.Errorf("could not initialize processed notifications: %w", err)
}

r, err := edrequester.New(
builder.Logger,
metrics.NewExecutionDataRequesterCollector(),
Expand Down Expand Up @@ -1341,11 +1351,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return builder.ExecutionDataPruner, nil
})
if builder.executionDataIndexingEnabled {
var indexedBlockHeight storage.ConsumerProgressInitializer
var indexedBlockHeightInitializer storage.ConsumerProgressInitializer

builder.Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeightInitializer = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
Expand Down Expand Up @@ -1456,6 +1466,13 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
node.StorageLockMgr,
)

// start processing from the first height of the registers db, which is initialized from
// the checkpoint. this ensures a consistent starting point for the indexed data.
indexedBlockHeight, err := indexedBlockHeightInitializer.Initialize(registers.FirstHeight())
if err != nil {
return nil, fmt.Errorf("could not initialize indexed block height: %w", err)
}

// execution state worker uses a jobqueue to process new execution data and indexes it by using the indexer.
builder.ExecutionIndexer, err = indexer.NewIndexer(
builder.Logger,
Expand Down
34 changes: 23 additions & 11 deletions cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
var (
followerState protocol.FollowerState

chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndex storage.ConsumerProgressInitializer // used in chunk consumer
processedBlockHeight storage.ConsumerProgressInitializer // used in block consumer
chunkQueue storage.ChunksQueue // used in chunk consumer
chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndexInitializer storage.ConsumerProgressInitializer // used in chunk consumer
processedBlockHeightInitializer storage.ConsumerProgressInitializer // used in block consumer
chunkQueue storage.ChunksQueue // used in chunk consumer

syncCore *chainsync.Core // used in follower engine
assignerEngine *assigner.Engine // the assigner engine
Expand Down Expand Up @@ -158,11 +158,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
return nil
}).
Module("processed chunk index consumer progress", func(node *NodeConfig) error {
processedChunkIndex = store.NewConsumerProgress(node.ProtocolDB, module.ConsumeProgressVerificationChunkIndex)
processedChunkIndexInitializer = store.NewConsumerProgress(node.ProtocolDB, module.ConsumeProgressVerificationChunkIndex)
return nil
}).
Module("processed block height consumer progress", func(node *NodeConfig) error {
processedBlockHeight = store.NewConsumerProgress(node.ProtocolDB, module.ConsumeProgressVerificationBlockHeight)
processedBlockHeightInitializer = store.NewConsumerProgress(node.ProtocolDB, module.ConsumeProgressVerificationBlockHeight)
return nil
}).
Module("chunks queue", func(node *NodeConfig) error {
Expand Down Expand Up @@ -269,6 +269,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
requesterEngine,
v.verConf.stopAtHeight)

processedChunkIndex, err := processedChunkIndexInitializer.Initialize(chunkconsumer.DefaultJobIndex)
if err != nil {
return nil, fmt.Errorf("could not initialize processed index: %w", err)
}

// requester and fetcher engines are started by chunk consumer
chunkConsumer, err = chunkconsumer.NewChunkConsumer(
node.Logger,
Expand Down Expand Up @@ -312,10 +317,17 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
return assignerEngine, nil
}).
Component("block consumer", func(node *NodeConfig) (module.ReadyDoneAware, error) {
var initBlockHeight uint64
var err error
sealedHead, err := node.State.Sealed().Head()
if err != nil {
return nil, fmt.Errorf("could not get sealed head: %w", err)
}

processedBlockHeight, err := processedBlockHeightInitializer.Initialize(sealedHead.Height)
if err != nil {
return nil, fmt.Errorf("could not initialize processed block height index: %w", err)
}

blockConsumer, initBlockHeight, err = blockconsumer.NewBlockConsumer(
blockConsumer, err = blockconsumer.NewBlockConsumer(
node.Logger,
collector,
processedBlockHeight,
Expand All @@ -335,7 +347,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {

node.Logger.Info().
Str("component", "node-builder").
Uint64("init_height", initBlockHeight).
Uint64("init_height", sealedHead.Height).
Msg("block consumer initialized")

return blockConsumer, nil
Expand Down
19 changes: 12 additions & 7 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,13 +715,14 @@ func (suite *Suite) TestGetSealedTransaction() {
)
require.NoError(suite.T(), err)

progress, err := store.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight).Initialize(suite.rootBlock.Height)
progress, err := store.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight).Initialize(suite.finalizedBlock.Height)
require.NoError(suite.T(), err)
lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(progress)
require.NoError(suite.T(), err)

// create the ingest engine
processedHeight := store.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight, err := store.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight).Initialize(suite.finalizedBlock.Height)
require.NoError(suite.T(), err)

collectionSyncer := ingestion.NewCollectionSyncer(
suite.log,
Expand Down Expand Up @@ -937,10 +938,12 @@ func (suite *Suite) TestGetTransactionResult() {
)
require.NoError(suite.T(), err)

processedHeightInitializer := store.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight, err := store.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight).
Initialize(suite.finalizedBlock.Height)
require.NoError(suite.T(), err)

lastFullBlockHeightProgress, err := store.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight).
Initialize(suite.rootBlock.Height)
Initialize(suite.finalizedBlock.Height)
require.NoError(suite.T(), err)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeightProgress)
Expand All @@ -966,7 +969,7 @@ func (suite *Suite) TestGetTransactionResult() {
all.Blocks,
all.Results,
all.Receipts,
processedHeightInitializer,
processedHeight,
collectionSyncer,
collectionExecutedMetric,
nil,
Expand Down Expand Up @@ -1204,9 +1207,11 @@ func (suite *Suite) TestExecuteScript() {
Once()

processedHeightInitializer := store.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight, err := processedHeightInitializer.Initialize(suite.finalizedBlock.Height)
require.NoError(suite.T(), err)

lastFullBlockHeightInitializer := store.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight)
lastFullBlockHeightProgress, err := lastFullBlockHeightInitializer.Initialize(suite.rootBlock.Height)
lastFullBlockHeightProgress, err := lastFullBlockHeightInitializer.Initialize(suite.finalizedBlock.Height)
require.NoError(suite.T(), err)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeightProgress)
Expand All @@ -1232,7 +1237,7 @@ func (suite *Suite) TestExecuteScript() {
all.Blocks,
all.Results,
all.Receipts,
processedHeightInitializer,
processedHeight,
collectionSyncer,
collectionExecutedMetric,
nil,
Expand Down
22 changes: 1 addition & 21 deletions engine/access/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func New(
blocks storage.Blocks,
executionResults storage.ExecutionResults,
executionReceipts storage.ExecutionReceipts,
finalizedProcessedHeight storage.ConsumerProgressInitializer,
finalizedProcessedHeight storage.ConsumerProgress,
collectionSyncer *CollectionSyncer,
collectionExecutedMetric module.CollectionExecutedMetric,
txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore,
Expand Down Expand Up @@ -153,19 +153,13 @@ func New(
// to get a sequential list of finalized blocks.
finalizedBlockReader := jobqueue.NewFinalizedBlockReader(state, blocks)

defaultIndex, err := e.defaultProcessedIndex()
if err != nil {
return nil, fmt.Errorf("could not read default finalized processed index: %w", err)
}

// create a jobqueue that will process new available finalized block. The `finalizedBlockNotifier` is used to
// signal new work, which is being triggered on the `processFinalizedBlockJob` handler.
e.finalizedBlockConsumer, err = jobqueue.NewComponentConsumer(
e.log.With().Str("module", "ingestion_block_consumer").Logger(),
e.finalizedBlockNotifier.Channel(),
finalizedProcessedHeight,
finalizedBlockReader,
defaultIndex,
e.processFinalizedBlockJob,
processFinalizedBlocksWorkersCount,
searchAhead,
Expand Down Expand Up @@ -201,20 +195,6 @@ func New(
return e, nil
}

// defaultProcessedIndex returns the last finalized block height from the protocol state.
//
// The finalizedBlockConsumer utilizes this return height to fetch and consume block jobs from
// jobs queue the first time it initializes.
//
// No errors are expected during normal operation.
func (e *Engine) defaultProcessedIndex() (uint64, error) {
final, err := e.state.Final().Head()
if err != nil {
return 0, fmt.Errorf("could not get finalized height: %w", err)
}
return final.Height, nil
}

// runFinalizedBlockConsumer runs the finalizedBlockConsumer component
func (e *Engine) runFinalizedBlockConsumer(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
e.finalizedBlockConsumer.Start(ctx)
Expand Down
Loading
Loading