Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion blockchain/blockdao/blockdao.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type (
blockCache cache.LRUCache
txLogCache cache.LRUCache
tipHeight uint64
// allowIndexerAhead indicates whether indexers are allowed to be ahead of DAO tip height
allowIndexerAhead bool
}
)

Expand All @@ -85,6 +87,13 @@ func WithBlobStore(bs BlobStore) Option {
}
}

// WithAllowIndexerAhead allows indexer to be ahead of DAO tip height
func WithAllowIndexerAhead() Option {
return func(dao *blockDAO) {
dao.allowIndexerAhead = true
}
}

// NewBlockDAOWithIndexersAndCache returns a BlockDAO with indexers which will consume blocks appended, and
// caches which will speed up reading
func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexer, cacheSize int, opts ...Option) BlockDAO {
Expand Down Expand Up @@ -123,6 +132,9 @@ func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexe
return nil
}
blockDAO.timerFactory = timerFactory
if blockDAO.allowIndexerAhead {
log.L().Warn("BlockDAO is configured to allow indexers to be ahead of DAO tip height")
}
return blockDAO
}

Expand Down Expand Up @@ -151,7 +163,21 @@ func (dao *blockDAO) Start(ctx context.Context) error {
}

func (dao *blockDAO) checkIndexers(ctx context.Context, checker BlockIndexerChecker) error {
tip := atomic.LoadUint64(&dao.tipHeight)
for i, indexer := range dao.indexers {
if dao.allowIndexerAhead {
idxHeight, err := indexer.Height()
if err != nil {
return errors.Wrap(err, "failed to get indexer height")
}
if idxHeight > tip {
log.L().Info(
"indexer is ahead of dao tip height, skipping check.",
zap.Int("indexer", i),
)
continue
}
}
if err := checker.CheckIndexer(ctx, indexer, 0, func(height uint64) {
if height%5000 == 0 {
log.L().Info(
Expand Down Expand Up @@ -381,7 +407,20 @@ func (dao *blockDAO) PutBlock(ctx context.Context, blk *block.Block) error {
// index the block if there's indexer
timer = dao.timerFactory.NewTimer("index_block")
defer timer.End()
for _, indexer := range dao.indexers {
for i, indexer := range dao.indexers {
if dao.allowIndexerAhead {
idxHeight, err := indexer.Height()
if err != nil {
return errors.Wrap(err, "failed to get indexer height")
}
if idxHeight >= blk.Height() {
log.L().Debug(
"indexer is ahead of dao tip height, skipping indexing.",
zap.Int("indexer", i),
)
continue
}
}
if err := indexer.PutBlock(ctx, blk); err != nil {
return err
}
Expand All @@ -407,6 +446,14 @@ func (dao *blockDAO) GetBlobsByHeight(height uint64) ([]*types.BlobTxSidecar, []
return dao.blobStore.GetBlobsByHeight(height)
}

func (dao *blockDAO) indexerAhead(indexer BlockIndexer, target uint64) (bool, error) {
idxHeight, err := indexer.Height()
if err != nil {
return false, errors.Wrap(err, "failed to get indexer height")
}
return idxHeight > target, nil
}

func lruCacheGet(c cache.LRUCache, key interface{}) (interface{}, bool) {
if c != nil {
return c.Get(key)
Expand Down
33 changes: 28 additions & 5 deletions blockindex/indexbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ type IndexBuilder struct {
dao blockdao.BlockStore
indexer Indexer
genesis genesis.Genesis
// allowIndexerAhead indicates whether indexers are allowed to be ahead of DAO tip height
allowIndexerAhead bool
}

// NewIndexBuilder instantiates an index builder
func NewIndexBuilder(chainID uint32, g genesis.Genesis, dao blockdao.BlockStore, indexer Indexer) (*IndexBuilder, error) {
func NewIndexBuilder(chainID uint32, g genesis.Genesis, dao blockdao.BlockStore, indexer Indexer, allowIndexerAhead bool) (*IndexBuilder, error) {
timerFactory, err := prometheustimer.New(
"iotex_indexer_batch_time",
"Indexer batch time",
Expand All @@ -64,10 +66,11 @@ func NewIndexBuilder(chainID uint32, g genesis.Genesis, dao blockdao.BlockStore,
return nil, err
}
return &IndexBuilder{
timerFactory: timerFactory,
dao: dao,
indexer: indexer,
genesis: g,
timerFactory: timerFactory,
dao: dao,
indexer: indexer,
genesis: g,
allowIndexerAhead: allowIndexerAhead,
}, nil
}

Expand Down Expand Up @@ -96,6 +99,20 @@ func (ib *IndexBuilder) Indexer() Indexer {
// ReceiveBlock handles the block and create the indices for the actions and receipts in it
func (ib *IndexBuilder) ReceiveBlock(blk *block.Block) error {
timer := ib.timerFactory.NewTimer("indexBlock")
if ib.allowIndexerAhead {
height, err := ib.indexer.Height()
if err != nil {
return err
}
if height >= blk.Height() {
log.L().Debug(
"indexer is ahead of or equal to the block height, skipping indexing.",
zap.Uint64("indexerHeight", height),
zap.Uint64("blockHeight", blk.Height()),
)
return nil
}
}
if err := ib.indexer.PutBlock(genesis.WithGenesisContext(context.Background(), ib.genesis), blk); err != nil {
log.L().Error(
"Error when indexing the block",
Expand Down Expand Up @@ -126,6 +143,12 @@ func (ib *IndexBuilder) init(ctx context.Context) error {
return nil
}
if startHeight > tipHeight {
if ib.allowIndexerAhead {
zap.L().Warn("IndexDB is ahead of BlockDAO",
zap.Uint64("indexerHeight", startHeight),
zap.Uint64("daoHeight", tipHeight))
return nil
}
// indexer height > dao height
// this shouldn't happen unless blocks are deliberately removed from dao w/o removing index
// in this case we revert the extra block index, but nothing we can do to revert action index
Expand Down
6 changes: 5 additions & 1 deletion chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ func (builder *Builder) buildBlockDAO(forTest bool) error {
if err != nil {
return err
}
if _, ok := builder.cfg.Plugins[config.AllowBlockDaoIndexerAheadPlugin]; ok {
opts = append(opts, blockdao.WithAllowIndexerAhead())
}
builder.cs.blockdao = blockdao.NewBlockDAOWithIndexersAndCache(
store, indexers, cfg.DB.MaxCacheSize, opts...)

Expand Down Expand Up @@ -501,7 +504,8 @@ func (builder *Builder) buildBlockchain(forSubChain, forTest bool) error {

if builder.cs.indexer != nil && builder.cfg.Chain.EnableAsyncIndexWrite {
// config asks for a standalone indexer
indexBuilder, err := blockindex.NewIndexBuilder(builder.cs.chain.ChainID(), builder.cfg.Genesis, builder.cs.blockdao, builder.cs.indexer)
_, allowIndexerAhead := builder.cfg.Plugins[config.AllowBlockDaoIndexerAheadPlugin]
indexBuilder, err := blockindex.NewIndexBuilder(builder.cs.chain.ChainID(), builder.cfg.Genesis, builder.cs.blockdao, builder.cs.indexer, allowIndexerAhead)
if err != nil {
return errors.Wrap(err, "failed to create index builder")
}
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
const (
// GatewayPlugin is the plugin of accepting user API requests and serving blockchain data to users
GatewayPlugin = iota
// AllowBlockDaoIndexerAheadPlugin allows the block DAO indexer to be ahead of the DAO tip height
AllowBlockDaoIndexerAheadPlugin
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

type strs []string
Expand Down Expand Up @@ -189,6 +191,8 @@ func New(configPaths []string, _plugins []string, validates ...Validate) (Config
switch strings.ToLower(plugin) {
case "gateway":
cfg.Plugins[GatewayPlugin] = nil
case "relax":
cfg.Plugins[AllowBlockDaoIndexerAheadPlugin] = nil
default:
return Config{}, errors.Errorf("Plugin %s is not supported", plugin)
}
Expand Down