diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index e06fd2ab29c..199e24caac7 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -562,8 +562,8 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder { var bs network.BlobService - var processedBlockHeight storage.ConsumerProgressInitializer - var processedNotifications storage.ConsumerProgressInitializer + var processedBlockHeightInitializer storage.ConsumerProgressInitializer + var processedNotificationsInitializer storage.ConsumerProgressInitializer var bsDependable *module.ProxiedReadyDoneAware var execDataDistributor *edrequester.ExecutionDataDistributor var execDataCacheBackend *herocache.BlockExecutionData @@ -604,14 +604,14 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess // writes execution data to. db := builder.ExecutionDatastoreManager.DB() - processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight) + processedBlockHeightInitializer = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight) return nil }). Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. db := builder.ExecutionDatastoreManager.DB() - processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification) + processedNotificationsInitializer = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification) return nil }). Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error { @@ -747,6 +747,16 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess execDataCacheBackend, ) + // start processing from the initial block height resolved from the execution data config + processedBlockHeight, err := processedBlockHeightInitializer.Initialize(builder.executionDataConfig.InitialBlockHeight) + if err != nil { + return nil, fmt.Errorf("could not initialize processed block height: %w", err) + } + processedNotifications, err := processedNotificationsInitializer.Initialize(builder.executionDataConfig.InitialBlockHeight) + if err != nil { + return nil, fmt.Errorf("could not initialize processed notifications: %w", err) + } + r, err := edrequester.New( builder.Logger, metrics.NewExecutionDataRequesterCollector(), @@ -838,7 +848,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } if builder.executionDataIndexingEnabled { - var indexedBlockHeight storage.ConsumerProgressInitializer + var indexedBlockHeightInitializer storage.ConsumerProgressInitializer builder. AdminCommand("execute-script", func(config *cmd.NodeConfig) commands.AdminCommand { @@ -846,7 +856,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess }). Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the MAIN db since that is where indexed execution data is stored. - indexedBlockHeight = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressExecutionDataIndexerBlockHeight) + indexedBlockHeightInitializer = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressExecutionDataIndexerBlockHeight) return nil }). Module("transaction results storage", func(node *cmd.NodeConfig) error { @@ -959,6 +969,13 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess node.StorageLockMgr, ) + // start processing from the first height of the registers db, which is initialized from + // the checkpoint. this ensures a consistent starting point for the indexed data. + indexedBlockHeight, err := indexedBlockHeightInitializer.Initialize(registers.FirstHeight()) + if err != nil { + return nil, fmt.Errorf("could not initialize indexed block height: %w", err) + } + // execution state worker uses a jobqueue to process new execution data and indexes it by using the indexer. builder.ExecutionIndexer, err = indexer.NewIndexer( builder.Logger, @@ -1719,8 +1736,8 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() { } func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { - var processedFinalizedBlockHeight storage.ConsumerProgressInitializer - var processedTxErrorMessagesBlockHeight storage.ConsumerProgressInitializer + var processedFinalizedBlockHeightInitializer storage.ConsumerProgressInitializer + var processedTxErrorMessagesBlockHeightInitializer storage.ConsumerProgressInitializer if builder.executionDataSyncEnabled { builder.BuildExecutionSyncComponents() @@ -1945,7 +1962,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil }). Module("processed finalized block height consumer progress", func(node *cmd.NodeConfig) error { - processedFinalizedBlockHeight = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressIngestionEngineBlockHeight) + processedFinalizedBlockHeightInitializer = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressIngestionEngineBlockHeight) return nil }). Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error { @@ -2233,6 +2250,12 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { ) builder.RequestEng.WithHandle(collectionSyncer.OnCollectionDownloaded) + // start ingesting finalized block from the earliest block in storage (sealed root block height) + processedFinalizedBlockHeight, err := processedFinalizedBlockHeightInitializer.Initialize(node.SealedRootBlock.Height) + if err != nil { + return nil, fmt.Errorf("could not initialize processed finalized block height: %w", err) + } + builder.IngestEng, err = ingestion.New( node.Logger, node.EngineRegistry, @@ -2279,13 +2302,19 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil }). Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error { - processedTxErrorMessagesBlockHeight = store.NewConsumerProgress( + processedTxErrorMessagesBlockHeightInitializer = store.NewConsumerProgress( builder.ProtocolDB, module.ConsumeProgressEngineTxErrorMessagesBlockHeight, ) return nil }). Component("transaction result error messages engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + // start processing from the earliest block in storage (sealed root block height) + processedTxErrorMessagesBlockHeight, err := processedTxErrorMessagesBlockHeightInitializer.Initialize(node.SealedRootBlock.Height) + if err != nil { + return nil, fmt.Errorf("could not initialize processed tx error messages block height: %w", err) + } + engine, err := tx_error_messages.New( node.Logger, metrics.NewTransactionErrorMessagesCollector(), diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index e2958d9fefa..0bdabfca00e 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -1102,8 +1102,8 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) { func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder { var ds datastore.Batching var bs network.BlobService - var processedBlockHeight storage.ConsumerProgressInitializer - var processedNotifications storage.ConsumerProgressInitializer + var processedBlockHeightInitializer storage.ConsumerProgressInitializer + var processedNotificationsInitializer storage.ConsumerProgressInitializer var publicBsDependable *module.ProxiedReadyDoneAware var execDataDistributor *edrequester.ExecutionDataDistributor var execDataCacheBackend *herocache.BlockExecutionData @@ -1148,7 +1148,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS // writes execution data to. db := builder.ExecutionDatastoreManager.DB() - processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight) + processedBlockHeightInitializer = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight) return nil }). Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error { @@ -1156,7 +1156,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS // writes execution data to. db := builder.ExecutionDatastoreManager.DB() - processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification) + processedNotificationsInitializer = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification) return nil }). Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error { @@ -1285,6 +1285,16 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS execDataCacheBackend, ) + // start processing from the initial block height resolved from the execution data config + processedBlockHeight, err := processedBlockHeightInitializer.Initialize(builder.executionDataConfig.InitialBlockHeight) + if err != nil { + return nil, fmt.Errorf("could not initialize processed block height: %w", err) + } + processedNotifications, err := processedNotificationsInitializer.Initialize(builder.executionDataConfig.InitialBlockHeight) + if err != nil { + return nil, fmt.Errorf("could not initialize processed notifications: %w", err) + } + r, err := edrequester.New( builder.Logger, metrics.NewExecutionDataRequesterCollector(), @@ -1341,11 +1351,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return builder.ExecutionDataPruner, nil }) if builder.executionDataIndexingEnabled { - var indexedBlockHeight storage.ConsumerProgressInitializer + var indexedBlockHeightInitializer storage.ConsumerProgressInitializer builder.Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the MAIN db since that is where indexed execution data is stored. - indexedBlockHeight = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressExecutionDataIndexerBlockHeight) + indexedBlockHeightInitializer = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressExecutionDataIndexerBlockHeight) return nil }).Module("transaction results storage", func(node *cmd.NodeConfig) error { builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize) @@ -1456,6 +1466,13 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS node.StorageLockMgr, ) + // start processing from the first height of the registers db, which is initialized from + // the checkpoint. this ensures a consistent starting point for the indexed data. + indexedBlockHeight, err := indexedBlockHeightInitializer.Initialize(registers.FirstHeight()) + if err != nil { + return nil, fmt.Errorf("could not initialize indexed block height: %w", err) + } + // execution state worker uses a jobqueue to process new execution data and indexes it by using the indexer. builder.ExecutionIndexer, err = indexer.NewIndexer( builder.Logger, diff --git a/cmd/verification_builder.go b/cmd/verification_builder.go index 47dbfd153a8..2581c23e929 100644 --- a/cmd/verification_builder.go +++ b/cmd/verification_builder.go @@ -91,11 +91,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { var ( followerState protocol.FollowerState - chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine - chunkRequests *stdmap.ChunkRequests // used in requester engine - processedChunkIndex storage.ConsumerProgressInitializer // used in chunk consumer - processedBlockHeight storage.ConsumerProgressInitializer // used in block consumer - chunkQueue storage.ChunksQueue // used in chunk consumer + chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine + chunkRequests *stdmap.ChunkRequests // used in requester engine + processedChunkIndexInitializer storage.ConsumerProgressInitializer // used in chunk consumer + processedBlockHeightInitializer storage.ConsumerProgressInitializer // used in block consumer + chunkQueue storage.ChunksQueue // used in chunk consumer syncCore *chainsync.Core // used in follower engine assignerEngine *assigner.Engine // the assigner engine @@ -158,11 +158,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { return nil }). Module("processed chunk index consumer progress", func(node *NodeConfig) error { - processedChunkIndex = store.NewConsumerProgress(node.ProtocolDB, module.ConsumeProgressVerificationChunkIndex) + processedChunkIndexInitializer = store.NewConsumerProgress(node.ProtocolDB, module.ConsumeProgressVerificationChunkIndex) return nil }). Module("processed block height consumer progress", func(node *NodeConfig) error { - processedBlockHeight = store.NewConsumerProgress(node.ProtocolDB, module.ConsumeProgressVerificationBlockHeight) + processedBlockHeightInitializer = store.NewConsumerProgress(node.ProtocolDB, module.ConsumeProgressVerificationBlockHeight) return nil }). Module("chunks queue", func(node *NodeConfig) error { @@ -269,6 +269,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { requesterEngine, v.verConf.stopAtHeight) + processedChunkIndex, err := processedChunkIndexInitializer.Initialize(chunkconsumer.DefaultJobIndex) + if err != nil { + return nil, fmt.Errorf("could not initialize processed index: %w", err) + } + // requester and fetcher engines are started by chunk consumer chunkConsumer, err = chunkconsumer.NewChunkConsumer( node.Logger, @@ -312,10 +317,17 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { return assignerEngine, nil }). Component("block consumer", func(node *NodeConfig) (module.ReadyDoneAware, error) { - var initBlockHeight uint64 - var err error + sealedHead, err := node.State.Sealed().Head() + if err != nil { + return nil, fmt.Errorf("could not get sealed head: %w", err) + } + + processedBlockHeight, err := processedBlockHeightInitializer.Initialize(sealedHead.Height) + if err != nil { + return nil, fmt.Errorf("could not initialize processed block height index: %w", err) + } - blockConsumer, initBlockHeight, err = blockconsumer.NewBlockConsumer( + blockConsumer, err = blockconsumer.NewBlockConsumer( node.Logger, collector, processedBlockHeight, @@ -335,7 +347,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { node.Logger.Info(). Str("component", "node-builder"). - Uint64("init_height", initBlockHeight). + Uint64("init_height", sealedHead.Height). Msg("block consumer initialized") return blockConsumer, nil diff --git a/engine/access/access_test.go b/engine/access/access_test.go index f5620fafe66..e77cba1ffd9 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -715,13 +715,14 @@ func (suite *Suite) TestGetSealedTransaction() { ) require.NoError(suite.T(), err) - progress, err := store.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight).Initialize(suite.rootBlock.Height) + progress, err := store.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight).Initialize(suite.finalizedBlock.Height) require.NoError(suite.T(), err) lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(progress) require.NoError(suite.T(), err) // create the ingest engine - processedHeight := store.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeight, err := store.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight).Initialize(suite.finalizedBlock.Height) + require.NoError(suite.T(), err) collectionSyncer := ingestion.NewCollectionSyncer( suite.log, @@ -937,10 +938,12 @@ func (suite *Suite) TestGetTransactionResult() { ) require.NoError(suite.T(), err) - processedHeightInitializer := store.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeight, err := store.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight). + Initialize(suite.finalizedBlock.Height) + require.NoError(suite.T(), err) lastFullBlockHeightProgress, err := store.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight). - Initialize(suite.rootBlock.Height) + Initialize(suite.finalizedBlock.Height) require.NoError(suite.T(), err) lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeightProgress) @@ -966,7 +969,7 @@ func (suite *Suite) TestGetTransactionResult() { all.Blocks, all.Results, all.Receipts, - processedHeightInitializer, + processedHeight, collectionSyncer, collectionExecutedMetric, nil, @@ -1204,9 +1207,11 @@ func (suite *Suite) TestExecuteScript() { Once() processedHeightInitializer := store.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeight, err := processedHeightInitializer.Initialize(suite.finalizedBlock.Height) + require.NoError(suite.T(), err) lastFullBlockHeightInitializer := store.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight) - lastFullBlockHeightProgress, err := lastFullBlockHeightInitializer.Initialize(suite.rootBlock.Height) + lastFullBlockHeightProgress, err := lastFullBlockHeightInitializer.Initialize(suite.finalizedBlock.Height) require.NoError(suite.T(), err) lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeightProgress) @@ -1232,7 +1237,7 @@ func (suite *Suite) TestExecuteScript() { all.Blocks, all.Results, all.Receipts, - processedHeightInitializer, + processedHeight, collectionSyncer, collectionExecutedMetric, nil, diff --git a/engine/access/ingestion/engine.go b/engine/access/ingestion/engine.go index d544b3effd8..1e0851922f1 100644 --- a/engine/access/ingestion/engine.go +++ b/engine/access/ingestion/engine.go @@ -104,7 +104,7 @@ func New( blocks storage.Blocks, executionResults storage.ExecutionResults, executionReceipts storage.ExecutionReceipts, - finalizedProcessedHeight storage.ConsumerProgressInitializer, + finalizedProcessedHeight storage.ConsumerProgress, collectionSyncer *CollectionSyncer, collectionExecutedMetric module.CollectionExecutedMetric, txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore, @@ -153,11 +153,6 @@ func New( // to get a sequential list of finalized blocks. finalizedBlockReader := jobqueue.NewFinalizedBlockReader(state, blocks) - defaultIndex, err := e.defaultProcessedIndex() - if err != nil { - return nil, fmt.Errorf("could not read default finalized processed index: %w", err) - } - // create a jobqueue that will process new available finalized block. The `finalizedBlockNotifier` is used to // signal new work, which is being triggered on the `processFinalizedBlockJob` handler. e.finalizedBlockConsumer, err = jobqueue.NewComponentConsumer( @@ -165,7 +160,6 @@ func New( e.finalizedBlockNotifier.Channel(), finalizedProcessedHeight, finalizedBlockReader, - defaultIndex, e.processFinalizedBlockJob, processFinalizedBlocksWorkersCount, searchAhead, @@ -201,20 +195,6 @@ func New( return e, nil } -// defaultProcessedIndex returns the last finalized block height from the protocol state. -// -// The finalizedBlockConsumer utilizes this return height to fetch and consume block jobs from -// jobs queue the first time it initializes. -// -// No errors are expected during normal operation. -func (e *Engine) defaultProcessedIndex() (uint64, error) { - final, err := e.state.Final().Head() - if err != nil { - return 0, fmt.Errorf("could not get finalized height: %w", err) - } - return final.Height, nil -} - // runFinalizedBlockConsumer runs the finalizedBlockConsumer component func (e *Engine) runFinalizedBlockConsumer(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { e.finalizedBlockConsumer.Start(ctx) diff --git a/engine/access/ingestion/engine_test.go b/engine/access/ingestion/engine_test.go index ece7712446a..357a1acb438 100644 --- a/engine/access/ingestion/engine_test.go +++ b/engine/access/ingestion/engine_test.go @@ -189,6 +189,8 @@ func (s *Suite) SetupTest() { // It waits until the ingestion engine starts. func (s *Suite) initEngineAndSyncer(ctx irrecoverable.SignalerContext) (*Engine, *CollectionSyncer) { processedHeightInitializer := store.NewConsumerProgress(s.db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeight, err := processedHeightInitializer.Initialize(s.finalizedBlock.Height) + require.NoError(s.T(), err) lastFullBlockHeight, err := store.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight).Initialize(s.finalizedBlock.Height) require.NoError(s.T(), err) @@ -216,7 +218,7 @@ func (s *Suite) initEngineAndSyncer(ctx irrecoverable.SignalerContext) (*Engine, s.blocks, s.results, s.receipts, - processedHeightInitializer, + processedHeight, syncer, s.collectionExecutedMetric, nil, diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go index b5629829c89..e81a1fea127 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go @@ -71,7 +71,7 @@ func New( metrics module.TransactionErrorMessagesMetrics, state protocol.State, headers storage.Headers, - txErrorMessagesProcessedHeight storage.ConsumerProgressInitializer, + txErrorMessagesProcessedHeight storage.ConsumerProgress, txErrorMessagesCore *TxErrorMessagesCore, ) (*Engine, error) { e := &Engine{ @@ -102,7 +102,6 @@ func New( e.txErrorMessagesNotifier.Channel(), txErrorMessagesProcessedHeight, sealedBlockReader, - e.state.Params().SealedRoot().Height, e.processTxResultErrorMessagesJob, processTxErrorMessagesWorkersCount, 0, diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go index 7acc1f4ad01..2dc7f16d645 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go @@ -128,10 +128,7 @@ func (s *TxErrorMessagesEngineSuite) SetupTest() { ).Maybe() s.proto.state.On("Params").Return(s.proto.params) - - // Mock the finalized and sealed root block header with height 0. s.proto.params.On("FinalizedRoot").Return(s.rootBlock.ToHeader(), nil) - s.proto.params.On("SealedRoot").Return(s.rootBlock.ToHeader(), nil) s.proto.snapshot.On("Head").Return( func() *flow.Header { @@ -150,10 +147,8 @@ func (s *TxErrorMessagesEngineSuite) SetupTest() { // initEngine creates a new instance of the transaction error messages engine // and waits for it to start. It initializes the engine with mocked components and state. func (s *TxErrorMessagesEngineSuite) initEngine(ctx irrecoverable.SignalerContext) *Engine { - processedTxErrorMessagesBlockHeight := store.NewConsumerProgress( - s.db, - module.ConsumeProgressEngineTxErrorMessagesBlockHeight, - ) + processedTxErrorMessagesBlockHeight, err := store.NewConsumerProgress(s.db, module.ConsumeProgressEngineTxErrorMessagesBlockHeight).Initialize(s.rootBlock.Height) + require.NoError(s.T(), err) execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( s.log, diff --git a/engine/access/ingestion2/engine_test.go b/engine/access/ingestion2/engine_test.go index c7813d7b028..475d3c1d169 100644 --- a/engine/access/ingestion2/engine_test.go +++ b/engine/access/ingestion2/engine_test.go @@ -201,7 +201,8 @@ func (s *Suite) TestComponentShutdown() { // initEngineAndSyncer create new instance of ingestion engine and collection collectionSyncer. // It waits until the ingestion engine starts. func (s *Suite) initEngineAndSyncer(ctx irrecoverable.SignalerContext) (*Engine, *CollectionSyncer) { - processedHeightInitializer := store.NewConsumerProgress(s.db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeightInitializer, err := store.NewConsumerProgress(s.db, module.ConsumeProgressIngestionEngineBlockHeight).Initialize(s.finalizedBlock.Height) + require.NoError(s.T(), err) lastFullBlockHeight, err := store.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight).Initialize(s.finalizedBlock.Height) require.NoError(s.T(), err) diff --git a/engine/access/ingestion2/finalized_block_processor.go b/engine/access/ingestion2/finalized_block_processor.go index 78511dd9803..c485f139906 100644 --- a/engine/access/ingestion2/finalized_block_processor.go +++ b/engine/access/ingestion2/finalized_block_processor.go @@ -62,15 +62,11 @@ func NewFinalizedBlockProcessor( state protocol.State, blocks storage.Blocks, executionResults storage.ExecutionResults, - finalizedProcessedHeight storage.ConsumerProgressInitializer, + finalizedProcessedHeight storage.ConsumerProgress, syncer *CollectionSyncer, collectionExecutedMetric module.CollectionExecutedMetric, ) (*FinalizedBlockProcessor, error) { reader := jobqueue.NewFinalizedBlockReader(state, blocks) - finalizedBlock, err := state.Final().Head() - if err != nil { - return nil, fmt.Errorf("could not get finalized block header: %w", err) - } consumerNotifier := engine.NewNotifier() processor := &FinalizedBlockProcessor{ @@ -82,12 +78,12 @@ func NewFinalizedBlockProcessor( collectionExecutedMetric: collectionExecutedMetric, } + var err error processor.consumer, err = jobqueue.NewComponentConsumer( log.With().Str("module", "ingestion_block_consumer").Logger(), consumerNotifier.Channel(), finalizedProcessedHeight, reader, - finalizedBlock.Height, processor.processFinalizedBlockJobCallback, finalizedBlockProcessorWorkerCount, searchAhead, diff --git a/engine/testutil/mock/nodes.go b/engine/testutil/mock/nodes.go index 4754f1f5081..f4fe6513ecc 100644 --- a/engine/testutil/mock/nodes.go +++ b/engine/testutil/mock/nodes.go @@ -302,12 +302,12 @@ type VerificationNode struct { Receipts storage.ExecutionReceipts // chunk consumer and processor for fetcher engine - ProcessedChunkIndex storage.ConsumerProgressInitializer + ProcessedChunkIndex storage.ConsumerProgress ChunksQueue storage.ChunksQueue ChunkConsumer *chunkconsumer.ChunkConsumer // block consumer for chunk consumer - ProcessedBlockHeight storage.ConsumerProgressInitializer + ProcessedBlockHeight storage.ConsumerProgress BlockConsumer *blockconsumer.BlockConsumer VerifierEngine *verifier.Engine diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index c0f969b43c9..30b849d510e 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -1015,7 +1015,8 @@ func VerificationNode(t testing.TB, } if node.ProcessedChunkIndex == nil { - node.ProcessedChunkIndex = store.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationChunkIndex) + node.ProcessedChunkIndex, err = store.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationChunkIndex).Initialize(chunkconsumer.DefaultJobIndex) + require.NoError(t, err) } if node.ChunksQueue == nil { @@ -1027,7 +1028,11 @@ func VerificationNode(t testing.TB, } if node.ProcessedBlockHeight == nil { - node.ProcessedBlockHeight = store.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationBlockHeight) + sealedHead, err := node.State.Sealed().Head() + require.NoError(t, err) + + node.ProcessedBlockHeight, err = store.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationBlockHeight).Initialize(sealedHead.Height) + require.NoError(t, err) } if node.VerifierEngine == nil { vm := fvm.NewVirtualMachine() @@ -1119,7 +1124,7 @@ func VerificationNode(t testing.TB, } if node.BlockConsumer == nil { - node.BlockConsumer, _, err = blockconsumer.NewBlockConsumer(node.Log, + node.BlockConsumer, err = blockconsumer.NewBlockConsumer(node.Log, collector, node.ProcessedBlockHeight, node.Blocks, diff --git a/engine/verification/assigner/blockconsumer/consumer.go b/engine/verification/assigner/blockconsumer/consumer.go index 89871fecb65..31730c41615 100644 --- a/engine/verification/assigner/blockconsumer/consumer.go +++ b/engine/verification/assigner/blockconsumer/consumer.go @@ -26,27 +26,15 @@ type BlockConsumer struct { metrics module.VerificationMetrics } -// defaultProcessedIndex returns the last sealed block height from the protocol state. -// -// The BlockConsumer utilizes this return height to fetch and consume block jobs from -// jobs queue the first time it initializes. -func defaultProcessedIndex(state protocol.State) (uint64, error) { - final, err := state.Sealed().Head() - if err != nil { - return 0, fmt.Errorf("could not get finalized height: %w", err) - } - return final.Height, nil -} - // NewBlockConsumer creates a new consumer and returns the default processed // index for initializing the processed index in storage. func NewBlockConsumer(log zerolog.Logger, metrics module.VerificationMetrics, - processedHeight storage.ConsumerProgressInitializer, + processedHeight storage.ConsumerProgress, blocks storage.Blocks, state protocol.State, blockProcessor assigner.FinalizedBlockProcessor, - maxProcessing uint64) (*BlockConsumer, uint64, error) { + maxProcessing uint64) (*BlockConsumer, error) { lg := log.With().Str("module", "block_consumer").Logger() @@ -58,14 +46,9 @@ func NewBlockConsumer(log zerolog.Logger, // the block reader is where the consumer reads new finalized blocks from (i.e., jobs). jobs := jobqueue.NewFinalizedBlockReader(state, blocks) - defaultIndex, err := defaultProcessedIndex(state) - if err != nil { - return nil, 0, fmt.Errorf("could not read default processed index: %w", err) - } - - consumer, err := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing, 0, defaultIndex) + consumer, err := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing, 0) if err != nil { - return nil, 0, fmt.Errorf("could not create block consumer: %w", err) + return nil, fmt.Errorf("could not create block consumer: %w", err) } blockConsumer := &BlockConsumer{ @@ -75,7 +58,7 @@ func NewBlockConsumer(log zerolog.Logger, } worker.withBlockConsumer(blockConsumer) - return blockConsumer, defaultIndex, nil + return blockConsumer, nil } // NotifyJobIsDone is invoked by the worker to let the consumer know that it is done diff --git a/engine/verification/assigner/blockconsumer/consumer_test.go b/engine/verification/assigner/blockconsumer/consumer_test.go index 471aba2a328..db39e1b111f 100644 --- a/engine/verification/assigner/blockconsumer/consumer_test.go +++ b/engine/verification/assigner/blockconsumer/consumer_test.go @@ -122,7 +122,6 @@ func withConsumer( unittest.RunWithPebbleDB(t, func(pdb *pebble.DB) { maxProcessing := uint64(workerCount) - processedHeight := store.NewConsumerProgress(pebbleimpl.ToDB(pdb), module.ConsumeProgressVerificationBlockHeight) collector := &metrics.NoopCollector{} tracer := trace.NewNoopTracer() log := unittest.Logger() @@ -134,7 +133,13 @@ func withConsumer( process: process, } - consumer, _, err := blockconsumer.NewBlockConsumer( + sealedHead, err := s.State.Sealed().Head() + require.NoError(t, err) + + processedHeight, err := store.NewConsumerProgress(pebbleimpl.ToDB(pdb), module.ConsumeProgressVerificationBlockHeight).Initialize(sealedHead.Height) + require.NoError(t, err) + + consumer, err := blockconsumer.NewBlockConsumer( unittest.Logger(), collector, processedHeight, diff --git a/engine/verification/fetcher/chunkconsumer/consumer.go b/engine/verification/fetcher/chunkconsumer/consumer.go index 66a71aa3dca..73352828818 100644 --- a/engine/verification/fetcher/chunkconsumer/consumer.go +++ b/engine/verification/fetcher/chunkconsumer/consumer.go @@ -29,7 +29,7 @@ type ChunkConsumer struct { func NewChunkConsumer( log zerolog.Logger, metrics module.VerificationMetrics, - processedIndexInitializer storage.ConsumerProgressInitializer, // to persist the processed index + processedIndex storage.ConsumerProgress, // to persist the processed index chunksQueue storage.ChunksQueue, // to read jobs (chunks) from chunkProcessor fetcher.AssignedChunkProcessor, // to process jobs (chunks) maxProcessing uint64, // max number of jobs to be processed in parallel @@ -40,7 +40,7 @@ func NewChunkConsumer( jobs := &ChunkJobs{locators: chunksQueue} lg := log.With().Str("module", "chunk_consumer").Logger() - consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndexInitializer, worker, maxProcessing, 0, DefaultJobIndex) + consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing, 0) if err != nil { return nil, err } diff --git a/engine/verification/fetcher/chunkconsumer/consumer_test.go b/engine/verification/fetcher/chunkconsumer/consumer_test.go index e314a4627ae..e0079bf5cd4 100644 --- a/engine/verification/fetcher/chunkconsumer/consumer_test.go +++ b/engine/verification/fetcher/chunkconsumer/consumer_test.go @@ -148,7 +148,8 @@ func WithConsumer( db := pebbleimpl.ToDB(pebbleDB) collector := &metrics.NoopCollector{} - processedIndex := store.NewConsumerProgress(db, module.ConsumeProgressVerificationChunkIndex) + processedIndex, err := store.NewConsumerProgress(db, module.ConsumeProgressVerificationChunkIndex).Initialize(chunkconsumer.DefaultJobIndex) + require.NoError(t, err) chunksQueue := store.NewChunkQueue(collector, db) ok, err := chunksQueue.Init(chunkconsumer.DefaultJobIndex) require.NoError(t, err) diff --git a/module/jobqueue/component_consumer.go b/module/jobqueue/component_consumer.go index 457aed3f804..8656f59c945 100644 --- a/module/jobqueue/component_consumer.go +++ b/module/jobqueue/component_consumer.go @@ -27,14 +27,12 @@ type ComponentConsumer struct { func NewComponentConsumer( log zerolog.Logger, workSignal <-chan struct{}, - progressInitializer storage.ConsumerProgressInitializer, + progress storage.ConsumerProgress, jobs module.Jobs, - defaultIndex uint64, processor JobProcessor, // method used to process jobs maxProcessing uint64, maxSearchAhead uint64, ) (*ComponentConsumer, error) { - c := &ComponentConsumer{ workSignal: workSignal, jobs: jobs, @@ -48,7 +46,7 @@ func NewComponentConsumer( maxProcessing, ) - consumer, err := NewConsumer(log, jobs, progressInitializer, worker, maxProcessing, maxSearchAhead, defaultIndex) + consumer, err := NewConsumer(log, jobs, progress, worker, maxProcessing, maxSearchAhead) if err != nil { return nil, err } diff --git a/module/jobqueue/component_consumer_test.go b/module/jobqueue/component_consumer_test.go index 8f4ec576580..dc121f0d96c 100644 --- a/module/jobqueue/component_consumer_test.go +++ b/module/jobqueue/component_consumer_test.go @@ -87,15 +87,11 @@ func (suite *ComponentConsumerSuite) prepareTest( progress.On("ProcessedIndex").Return(suite.defaultIndex, nil) progress.On("SetProcessedIndex", mock.AnythingOfType("uint64")).Return(nil) - progressInitializer := new(storagemock.ConsumerProgressInitializer) - progressInitializer.On("Initialize", mock.AnythingOfType("uint64")).Return(progress, nil) - consumer, err := NewComponentConsumer( unittest.Logger(), workSignal, - progressInitializer, + progress, jobs, - suite.defaultIndex, processor, suite.maxProcessing, suite.maxSearchAhead, diff --git a/module/jobqueue/consumer.go b/module/jobqueue/consumer.go index 035f625dfaf..4ffb4998184 100644 --- a/module/jobqueue/consumer.go +++ b/module/jobqueue/consumer.go @@ -50,18 +50,11 @@ type Consumer struct { func NewConsumer( log zerolog.Logger, jobs module.Jobs, - progressInitializer storage.ConsumerProgressInitializer, + progress storage.ConsumerProgress, worker Worker, maxProcessing uint64, maxSearchAhead uint64, - defaultIndex uint64, ) (*Consumer, error) { - - progress, err := progressInitializer.Initialize(defaultIndex) - if err != nil { - return nil, fmt.Errorf("could not initialize processed index: %w", err) - } - processedIndex, err := progress.ProcessedIndex() if err != nil { return nil, fmt.Errorf("could not read processed index: %w", err) diff --git a/module/jobqueue/consumer_behavior_test.go b/module/jobqueue/consumer_behavior_test.go index b26f1249720..8e6cfe7eed8 100644 --- a/module/jobqueue/consumer_behavior_test.go +++ b/module/jobqueue/consumer_behavior_test.go @@ -580,8 +580,12 @@ func assertProcessed(t testing.TB, cp storage.ConsumerProgress, expectProcessed func newTestConsumer(t testing.TB, cp storage.ConsumerProgressInitializer, jobs module.Jobs, worker jobqueue.Worker, maxSearchAhead uint64, defaultIndex uint64) module.JobConsumer { log := unittest.Logger().With().Str("module", "consumer").Logger() + + progress, err := cp.Initialize(defaultIndex) + require.NoError(t, err) + maxProcessing := uint64(3) - c, err := jobqueue.NewConsumer(log, jobs, cp, worker, maxProcessing, maxSearchAhead, defaultIndex) + c, err := jobqueue.NewConsumer(log, jobs, progress, worker, maxProcessing, maxSearchAhead) require.NoError(t, err) return c } diff --git a/module/jobqueue/consumer_test.go b/module/jobqueue/consumer_test.go index 4c672d73876..70810f758f2 100644 --- a/module/jobqueue/consumer_test.go +++ b/module/jobqueue/consumer_test.go @@ -160,10 +160,11 @@ func TestProcessedIndexDeletion(t *testing.T) { dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { log := unittest.Logger().With().Str("module", "consumer").Logger() jobs := NewMockJobs() - progressInitializer := store.NewConsumerProgress(db, "consumer") + progress, err := store.NewConsumerProgress(db, "consumer").Initialize(0) + require.NoError(t, err) worker := newMockWorker() maxProcessing := uint64(3) - c, err := NewConsumer(log, jobs, progressInitializer, worker, maxProcessing, 0, 0) + c, err := NewConsumer(log, jobs, progress, worker, maxProcessing, 0) require.NoError(t, err) worker.WithConsumer(c) @@ -200,11 +201,10 @@ func TestCheckBeforeStartIsNoop(t *testing.T) { c, err := NewConsumer( unittest.Logger(), NewMockJobs(), - progressInitializer, + progress, worker, uint64(3), 0, - 10, // default index is before the stored processedIndex ) require.NoError(t, err) worker.WithConsumer(c) diff --git a/module/state_synchronization/indexer/indexer.go b/module/state_synchronization/indexer/indexer.go index db164a2c12d..5ac761083f5 100644 --- a/module/state_synchronization/indexer/indexer.go +++ b/module/state_synchronization/indexer/indexer.go @@ -70,13 +70,14 @@ func NewIndexer( indexer *IndexerCore, executionCache *cache.ExecutionDataCache, executionDataLatestHeight func() (uint64, error), - processedHeightInitializer storage.ConsumerProgressInitializer, + processedHeight storage.ConsumerProgress, ) (*Indexer, error) { + r := &Indexer{ log: log.With().Str("module", "execution_indexer").Logger(), exeDataNotifier: engine.NewNotifier(), blockIndexedNotifier: engine.NewNotifier(), - lastProcessedHeight: atomic.NewUint64(initHeight), + lastProcessedHeight: atomic.NewUint64(initHeight), // TODO(peter): I think this should be processedHeight.ProcessedIndex(), not initHeight indexer: indexer, registers: registers, ProcessedHeightRecorder: execution_data.NewProcessedHeightRecorderManager(initHeight), @@ -89,9 +90,8 @@ func NewIndexer( jobConsumer, err := jobqueue.NewComponentConsumer( r.log, r.exeDataNotifier.Channel(), - processedHeightInitializer, + processedHeight, r.exeDataReader, - initHeight, r.processExecutionData, workersCount, searchAhead, diff --git a/module/state_synchronization/indexer/indexer_test.go b/module/state_synchronization/indexer/indexer_test.go index 83f647244d2..4a00ab296ec 100644 --- a/module/state_synchronization/indexer/indexer_test.go +++ b/module/state_synchronization/indexer/indexer_test.go @@ -83,7 +83,7 @@ func newIndexerTest(t *testing.T, g *fixtures.GeneratorSuite, blocks []*flow.Blo indexerCoreTest.indexer, exeCache, test.latestHeight, - &mockProgressInitializer{progress: progress}, + progress, ) require.NoError(t, err) @@ -119,14 +119,6 @@ func (w *indexerTest) run(ctx irrecoverable.SignalerContext, reachHeight uint64, unittest.RequireCloseBefore(w.t, w.worker.Done(), testTimeout, "timeout waiting for the consumer to be done") } -type mockProgressInitializer struct { - progress *mockProgress -} - -func (m *mockProgressInitializer) Initialize(defaultIndex uint64) (storage.ConsumerProgress, error) { - return m.progress, nil -} - var _ storage.ConsumerProgress = (*mockProgress)(nil) type mockProgress struct { diff --git a/module/state_synchronization/requester/execution_data_requester.go b/module/state_synchronization/requester/execution_data_requester.go index c6c04823f01..f7a21e2302a 100644 --- a/module/state_synchronization/requester/execution_data_requester.go +++ b/module/state_synchronization/requester/execution_data_requester.go @@ -145,8 +145,8 @@ func New( edrMetrics module.ExecutionDataRequesterMetrics, downloader execution_data.Downloader, execDataCache *cache.ExecutionDataCache, - processedHeight storage.ConsumerProgressInitializer, - processedNotifications storage.ConsumerProgressInitializer, + processedHeight storage.ConsumerProgress, + processedNotifications storage.ConsumerProgress, state protocol.State, headers storage.Headers, cfg ExecutionDataConfig, @@ -184,7 +184,6 @@ func New( e.finalizationNotifier.Channel(), // to listen to finalization events to find newly sealed blocks processedHeight, // read and persist the downloaded height sealedBlockReader, // read sealed blocks by height - e.config.InitialBlockHeight, // initial "last processed" height for empty db e.processBlockJob, // process the sealed block job to download its execution data fetchWorkers, // the number of concurrent workers e.config.MaxSearchAhead, // max number of unsent notifications to allow before pausing new fetches @@ -231,7 +230,6 @@ func New( executionDataNotifier.Channel(), // listen for notifications from the block consumer processedNotifications, // read and persist the notified height e.executionDataReader, // read execution data by height - e.config.InitialBlockHeight, // initial "last processed" height for empty db e.processNotificationJob, // process the job to send notifications for an execution data 1, // use a single worker to ensure notification is delivered in consecutive order 0, // search ahead limit controlled by worker count diff --git a/module/state_synchronization/requester/execution_data_requester_test.go b/module/state_synchronization/requester/execution_data_requester_test.go index 3684679d900..ec8720e7fc4 100644 --- a/module/state_synchronization/requester/execution_data_requester_test.go +++ b/module/state_synchronization/requester/execution_data_requester_test.go @@ -412,8 +412,10 @@ func (suite *ExecutionDataRequesterSuite) prepareRequesterTest(cfg *fetchTestRun edCache := cache.NewExecutionDataCache(suite.downloader, headers, seals, results, heroCache) followerDistributor := pubsub.NewFollowerDistributor() - processedHeight := store.NewConsumerProgress(pebbleimpl.ToDB(suite.db), module.ConsumeProgressExecutionDataRequesterBlockHeight) - processedNotification := store.NewConsumerProgress(pebbleimpl.ToDB(suite.db), module.ConsumeProgressExecutionDataRequesterNotification) + processedHeight, err := store.NewConsumerProgress(pebbleimpl.ToDB(suite.db), module.ConsumeProgressExecutionDataRequesterBlockHeight).Initialize(cfg.startHeight - 1) + suite.Require().NoError(err) + processedNotification, err := store.NewConsumerProgress(pebbleimpl.ToDB(suite.db), module.ConsumeProgressExecutionDataRequesterNotification).Initialize(cfg.startHeight - 1) + suite.Require().NoError(err) edr, err := New( logger,