diff --git a/blockchain/blockdao/blockdao.go b/blockchain/blockdao/blockdao.go index a3bdb493c8..a783ede3a5 100644 --- a/blockchain/blockdao/blockdao.go +++ b/blockchain/blockdao/blockdao.go @@ -74,6 +74,8 @@ type ( blockCache cache.LRUCache txLogCache cache.LRUCache tipHeight uint64 + // allowIndexerAhead indicates whether indexers are allowed to be ahead of DAO tip height + allowIndexerAhead bool } ) @@ -85,6 +87,13 @@ func WithBlobStore(bs BlobStore) Option { } } +// WithAllowIndexerAhead allows indexer to be ahead of DAO tip height +func WithAllowIndexerAhead() Option { + return func(dao *blockDAO) { + dao.allowIndexerAhead = true + } +} + // NewBlockDAOWithIndexersAndCache returns a BlockDAO with indexers which will consume blocks appended, and // caches which will speed up reading func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexer, cacheSize int, opts ...Option) BlockDAO { @@ -123,6 +132,9 @@ func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexe return nil } blockDAO.timerFactory = timerFactory + if blockDAO.allowIndexerAhead { + log.L().Warn("BlockDAO is configured to allow indexers to be ahead of DAO tip height") + } return blockDAO } @@ -151,7 +163,21 @@ func (dao *blockDAO) Start(ctx context.Context) error { } func (dao *blockDAO) checkIndexers(ctx context.Context, checker BlockIndexerChecker) error { + tip := atomic.LoadUint64(&dao.tipHeight) for i, indexer := range dao.indexers { + if dao.allowIndexerAhead { + idxHeight, err := indexer.Height() + if err != nil { + return errors.Wrap(err, "failed to get indexer height") + } + if idxHeight > tip { + log.L().Info( + "indexer is ahead of dao tip height, skipping check.", + zap.Int("indexer", i), + ) + continue + } + } if err := checker.CheckIndexer(ctx, indexer, 0, func(height uint64) { if height%5000 == 0 { log.L().Info( @@ -381,7 +407,20 @@ func (dao *blockDAO) PutBlock(ctx context.Context, blk *block.Block) error { // index the block if there's indexer timer = dao.timerFactory.NewTimer("index_block") defer timer.End() - for _, indexer := range dao.indexers { + for i, indexer := range dao.indexers { + if dao.allowIndexerAhead { + idxHeight, err := indexer.Height() + if err != nil { + return errors.Wrap(err, "failed to get indexer height") + } + if idxHeight >= blk.Height() { + log.L().Debug( + "indexer is ahead of dao tip height, skipping indexing.", + zap.Int("indexer", i), + ) + continue + } + } if err := indexer.PutBlock(ctx, blk); err != nil { return err } @@ -407,6 +446,14 @@ func (dao *blockDAO) GetBlobsByHeight(height uint64) ([]*types.BlobTxSidecar, [] return dao.blobStore.GetBlobsByHeight(height) } +func (dao *blockDAO) indexerAhead(indexer BlockIndexer, target uint64) (bool, error) { + idxHeight, err := indexer.Height() + if err != nil { + return false, errors.Wrap(err, "failed to get indexer height") + } + return idxHeight > target, nil +} + func lruCacheGet(c cache.LRUCache, key interface{}) (interface{}, bool) { if c != nil { return c.Get(key) diff --git a/blockindex/indexbuilder.go b/blockindex/indexbuilder.go index 2a873516d0..3814b6d53f 100644 --- a/blockindex/indexbuilder.go +++ b/blockindex/indexbuilder.go @@ -50,10 +50,12 @@ type IndexBuilder struct { dao blockdao.BlockStore indexer Indexer genesis genesis.Genesis + // allowIndexerAhead indicates whether indexers are allowed to be ahead of DAO tip height + allowIndexerAhead bool } // NewIndexBuilder instantiates an index builder -func NewIndexBuilder(chainID uint32, g genesis.Genesis, dao blockdao.BlockStore, indexer Indexer) (*IndexBuilder, error) { +func NewIndexBuilder(chainID uint32, g genesis.Genesis, dao blockdao.BlockStore, indexer Indexer, allowIndexerAhead bool) (*IndexBuilder, error) { timerFactory, err := prometheustimer.New( "iotex_indexer_batch_time", "Indexer batch time", @@ -64,10 +66,11 @@ func NewIndexBuilder(chainID uint32, g genesis.Genesis, dao blockdao.BlockStore, return nil, err } return &IndexBuilder{ - timerFactory: timerFactory, - dao: dao, - indexer: indexer, - genesis: g, + timerFactory: timerFactory, + dao: dao, + indexer: indexer, + genesis: g, + allowIndexerAhead: allowIndexerAhead, }, nil } @@ -96,6 +99,20 @@ func (ib *IndexBuilder) Indexer() Indexer { // ReceiveBlock handles the block and create the indices for the actions and receipts in it func (ib *IndexBuilder) ReceiveBlock(blk *block.Block) error { timer := ib.timerFactory.NewTimer("indexBlock") + if ib.allowIndexerAhead { + height, err := ib.indexer.Height() + if err != nil { + return err + } + if height >= blk.Height() { + log.L().Debug( + "indexer is ahead of or equal to the block height, skipping indexing.", + zap.Uint64("indexerHeight", height), + zap.Uint64("blockHeight", blk.Height()), + ) + return nil + } + } if err := ib.indexer.PutBlock(genesis.WithGenesisContext(context.Background(), ib.genesis), blk); err != nil { log.L().Error( "Error when indexing the block", @@ -126,6 +143,12 @@ func (ib *IndexBuilder) init(ctx context.Context) error { return nil } if startHeight > tipHeight { + if ib.allowIndexerAhead { + zap.L().Warn("IndexDB is ahead of BlockDAO", + zap.Uint64("indexerHeight", startHeight), + zap.Uint64("daoHeight", tipHeight)) + return nil + } // indexer height > dao height // this shouldn't happen unless blocks are deliberately removed from dao w/o removing index // in this case we revert the extra block index, but nothing we can do to revert action index diff --git a/chainservice/builder.go b/chainservice/builder.go index 9bd18f1dc8..0fa6f20d23 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -317,6 +317,9 @@ func (builder *Builder) buildBlockDAO(forTest bool) error { if err != nil { return err } + if _, ok := builder.cfg.Plugins[config.AllowBlockDaoIndexerAheadPlugin]; ok { + opts = append(opts, blockdao.WithAllowIndexerAhead()) + } builder.cs.blockdao = blockdao.NewBlockDAOWithIndexersAndCache( store, indexers, cfg.DB.MaxCacheSize, opts...) @@ -501,7 +504,8 @@ func (builder *Builder) buildBlockchain(forSubChain, forTest bool) error { if builder.cs.indexer != nil && builder.cfg.Chain.EnableAsyncIndexWrite { // config asks for a standalone indexer - indexBuilder, err := blockindex.NewIndexBuilder(builder.cs.chain.ChainID(), builder.cfg.Genesis, builder.cs.blockdao, builder.cs.indexer) + _, allowIndexerAhead := builder.cfg.Plugins[config.AllowBlockDaoIndexerAheadPlugin] + indexBuilder, err := blockindex.NewIndexBuilder(builder.cs.chain.ChainID(), builder.cfg.Genesis, builder.cs.blockdao, builder.cs.indexer, allowIndexerAhead) if err != nil { return errors.Wrap(err, "failed to create index builder") } diff --git a/config/config.go b/config/config.go index f07a82ab16..318c928d7f 100644 --- a/config/config.go +++ b/config/config.go @@ -44,6 +44,8 @@ const ( const ( // GatewayPlugin is the plugin of accepting user API requests and serving blockchain data to users GatewayPlugin = iota + // AllowBlockDaoIndexerAheadPlugin allows the block DAO indexer to be ahead of the DAO tip height + AllowBlockDaoIndexerAheadPlugin ) type strs []string @@ -189,6 +191,8 @@ func New(configPaths []string, _plugins []string, validates ...Validate) (Config switch strings.ToLower(plugin) { case "gateway": cfg.Plugins[GatewayPlugin] = nil + case "relax": + cfg.Plugins[AllowBlockDaoIndexerAheadPlugin] = nil default: return Config{}, errors.Errorf("Plugin %s is not supported", plugin) }