Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e4404c3
chore: address comments from previous PR
Mar 27, 2025
5d39841
refactor(validitywindow)initialization flow
Mar 28, 2025
db938f5
chore(chain): rename `executionBlockParser` to `blockParser`
Mar 28, 2025
6c0bf1c
Merge branch 'main' of github.com:ava-labs/hypersdk into followup-pre…
Apr 11, 2025
f3c2f22
chore(chain): address PR comment and remove `ExecutionBlockParser` in…
Apr 15, 2025
c9cc266
feat(validitywindow): add full window observation tracking and startu…
Apr 15, 2025
6c015d6
feat(vm): require populated validity window on state sync finish. Ens…
Apr 15, 2025
bb68275
lint
Apr 15, 2025
e7c56b6
chore: typo
Apr 15, 2025
76ce137
docs
Apr 15, 2025
a128136
Merge branch 'main' into followup-pre-populate-chain-validity-window
Elvis339 Apr 15, 2025
83b5138
chore(vm): revert change to `GetExecutionBlock`
Apr 15, 2025
aec68dc
feat(vm): replace validity window populated check from state sync to …
Apr 15, 2025
5dd98a5
test(validitywindow): Populated func
Apr 15, 2025
5fac630
feat(validitywindow): rename `populateValidityWindow` to `Populate`
Apr 15, 2025
c347e32
feat(validitywindow): Complete
Apr 17, 2025
db01989
tests
Apr 17, 2025
199469c
test
Apr 17, 2025
74dcea7
Merge branch 'main' into followup-pre-populate-chain-validity-window
Elvis339 Apr 17, 2025
3581f26
chore(syncer): address PR error style comment
Apr 18, 2025
d263903
docs(validitywindow)
Apr 18, 2025
a47f09a
docs
Apr 18, 2025
72c2b90
feat(chain_index): Combine error handling for updating the last accep…
Apr 18, 2025
580cb64
feat(vm): return error instead of panic
Apr 22, 2025
2951d4e
feat(chain_index): Add cleanup logic for historical blocks in ChainIndex
Apr 22, 2025
65f5779
chore: rm fmt print
Apr 22, 2025
981b6cb
Merge branch 'main' into followup-pre-populate-chain-validity-window
Elvis339 Apr 23, 2025
f3700b7
Merge branch 'main' of github.com:ava-labs/hypersdk into followup-pre…
Elvis339 Apr 28, 2025
a5f691b
chore: address PR comments regarding chain index
Elvis339 Apr 29, 2025
94538b1
chore: address PR comments regarding validity window
Elvis339 Apr 29, 2025
b691b2b
Merge branch 'main' into followup-pre-populate-chain-validity-window
Elvis339 Apr 30, 2025
1ae7c6e
Merge branch 'main' into followup-pre-populate-chain-validity-window
Elvis339 May 2, 2025
fcf1b4f
docs(validitywindow)
Elvis339 May 5, 2025
248dac9
nits
Elvis339 May 5, 2025
2ab420f
Merge branch 'main' into followup-pre-populate-chain-validity-window
Elvis339 May 5, 2025
58f8cea
Merge branch 'main' into followup-pre-populate-chain-validity-window
Elvis339 May 12, 2025
1819aad
feat(validitywindow): remove explicit `head` nil check
Elvis339 May 12, 2025
a3d576a
docs(validitywindow)
Elvis339 May 12, 2025
8d00108
docs(validitywindow)
Elvis339 May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ type Chain struct {
func NewChain(
tracer trace.Tracer,
registerer *prometheus.Registry,
parser Parser,
mempool Mempool,
logger logging.Logger,
ruleFactory RuleFactory,
metadataManager MetadataManager,
balanceHandler BalanceHandler,
authVerifiers workers.Workers,
authEngines AuthEngines,
blockParser *BlockParser,
validityWindow ValidityWindow,
config Config,
) (*Chain, error) {
Expand Down Expand Up @@ -72,7 +72,7 @@ func NewChain(
metadataManager,
balanceHandler,
),
blockParser: NewBlockParser(tracer, parser),
blockParser: blockParser,
accepter: NewAccepter(tracer, validityWindow, metrics),
}, nil
}
Expand Down
49 changes: 28 additions & 21 deletions chain/chaintest/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,27 +278,6 @@ func (test *BlockBenchmark[T]) Run(ctx context.Context, b *testing.B) {
}

chainIndex := &validitywindowtest.MockChainIndex[*chain.Transaction]{}
validityWindow := validitywindow.NewTimeValidityWindow(
logging.NoLog{},
trace.Noop,
chainIndex,
func(timestamp int64) int64 {
return test.RuleFactory.GetRules(timestamp).GetValidityWindow()
},
)

processor := chain.NewProcessor(
trace.Noop,
&logging.NoLog{},
test.RuleFactory,
processorWorkers,
test.AuthEngines,
test.MetadataManager,
test.BalanceHandler,
validityWindow,
metrics,
test.Config,
)

factories, keys, genesis, err := test.GenesisF(test.NumTxsPerBlock)
r.NoError(err)
Expand Down Expand Up @@ -343,6 +322,34 @@ func (test *BlockBenchmark[T]) Run(ctx context.Context, b *testing.B) {
chainIndex.Set(blk.GetID(), blk)
}

// Populate a validity window starting from the last block from chain-index
head := blocks[len(blocks)-1]

validityWindow, err := validitywindow.NewTimeValidityWindow(
ctx,
logging.NoLog{},
trace.Noop,
chainIndex,
head,
func(timestamp int64) int64 {
return test.RuleFactory.GetRules(timestamp).GetValidityWindow()
},
)
r.NoError(err)

processor := chain.NewProcessor(
trace.Noop,
&logging.NoLog{},
test.RuleFactory,
processorWorkers,
test.AuthEngines,
test.MetadataManager,
test.BalanceHandler,
validityWindow,
metrics,
test.Config,
)

var parentView merkledb.View
parentView = db
b.ResetTimer()
Expand Down
155 changes: 135 additions & 20 deletions chainindex/chain_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"time"

Expand Down Expand Up @@ -65,6 +66,7 @@ type Parser[T Block] interface {
}

func New[T Block](
ctx context.Context,
log logging.Logger,
registry prometheus.Registerer,
config Config,
Expand All @@ -79,15 +81,21 @@ func New[T Block](
return nil, errBlockCompactionFrequencyZero
}

return &ChainIndex[T]{
ci := &ChainIndex[T]{
config: config,
// Offset by random number to ensure the network does not compact simultaneously
compactionOffset: rand.Uint64() % config.BlockCompactionFrequency, //nolint:gosec
metrics: metrics,
log: log,
db: db,
parser: parser,
}, nil
}

if err := ci.cleanupOnStartup(ctx); err != nil {
return nil, err
}

return ci, nil
}

func (c *ChainIndex[T]) GetLastAcceptedHeight(_ context.Context) (uint64, error) {
Expand All @@ -101,19 +109,11 @@ func (c *ChainIndex[T]) GetLastAcceptedHeight(_ context.Context) (uint64, error)
func (c *ChainIndex[T]) UpdateLastAccepted(ctx context.Context, blk T) error {
batch := c.db.NewBatch()

var (
blkID = blk.GetID()
height = blk.GetHeight()
blkBytes = blk.GetBytes()
)
height := blk.GetHeight()
heightBytes := binary.BigEndian.AppendUint64(nil, height)
err := errors.Join(
if err := errors.Join(
batch.Put(lastAcceptedKey, heightBytes),
batch.Put(prefixBlockIDHeightKey(blkID), heightBytes),
batch.Put(prefixBlockHeightIDKey(height), blkID[:]),
batch.Put(prefixBlockKey(height), blkBytes),
)
if err != nil {
c.writeBlock(batch, blk)); err != nil {
return err
}

Expand All @@ -122,17 +122,15 @@ func (c *ChainIndex[T]) UpdateLastAccepted(ctx context.Context, blk T) error {
return batch.Write()
}

if err := batch.Delete(prefixBlockKey(expiryHeight)); err != nil {
return err
}
deleteBlkID, err := c.GetBlockIDAtHeight(ctx, expiryHeight)
if err != nil {
return err
}
if err := batch.Delete(prefixBlockIDHeightKey(deleteBlkID)); err != nil {
return err
}
if err := batch.Delete(prefixBlockHeightIDKey(expiryHeight)); err != nil {
if err = errors.Join(
batch.Delete(prefixBlockKey(expiryHeight)),
batch.Delete(prefixBlockIDHeightKey(deleteBlkID)),
batch.Delete(prefixBlockHeightIDKey(expiryHeight)),
); err != nil {
return err
}
c.metrics.deletedBlocks.Inc()
Expand All @@ -151,6 +149,18 @@ func (c *ChainIndex[T]) UpdateLastAccepted(ctx context.Context, blk T) error {
return batch.Write()
}

// SaveHistorical writes block on-disk, without updating lastAcceptedKey,
// It should be used only for historical blocks, it's relying on heuristic of eventually calling UpdateLastAccepted,
// which will delete expired blocks
func (c *ChainIndex[T]) SaveHistorical(blk T) error {
batch := c.db.NewBatch()
if err := c.writeBlock(batch, blk); err != nil {
return err
}

return batch.Write()
}

func (c *ChainIndex[T]) GetBlock(ctx context.Context, blkID ids.ID) (T, error) {
height, err := c.GetBlockIDHeight(ctx, blkID)
if err != nil {
Expand Down Expand Up @@ -183,6 +193,105 @@ func (c *ChainIndex[T]) GetBlockByHeight(ctx context.Context, blkHeight uint64)
return c.parser.ParseBlock(ctx, blkBytes)
}

func (_ *ChainIndex[T]) writeBlock(batch database.Batch, blk T) error {
var (
blkID = blk.GetID()
height = blk.GetHeight()
blkBytes = blk.GetBytes()
)
heightBytes := binary.BigEndian.AppendUint64(nil, height)
return errors.Join(
batch.Put(prefixBlockIDHeightKey(blkID), heightBytes),
batch.Put(prefixBlockHeightIDKey(height), blkID[:]),
batch.Put(prefixBlockKey(height), blkBytes),
)
}

// cleanupOnStartup performs cleanup of historical blocks outside the accepted window.
//
// The cleanup removes all blocks below this threshold:
// | <--- Historical Blocks (delete) ---> | <--- AcceptedBlockWindow ---> | Last Accepted |
func (c *ChainIndex[T]) cleanupOnStartup(ctx context.Context) error {
lastAcceptedHeight, err := c.GetLastAcceptedHeight(ctx)
if err != nil && err != database.ErrNotFound {
return err
}

// If there's no accepted window or lastAcceptedHeight is too small, nothing to clean
if c.config.AcceptedBlockWindow == 0 || lastAcceptedHeight <= c.config.AcceptedBlockWindow {
return nil
}

thresholdHeight := lastAcceptedHeight - c.config.AcceptedBlockWindow

c.log.Debug("cleaning up historical blocks outside accepted window",
zap.Uint64("lastAcceptedHeight", lastAcceptedHeight),
zap.Uint64("thresholdHeight", thresholdHeight),
zap.Uint64("acceptedBlockWindow", c.config.AcceptedBlockWindow))

it := c.db.NewIteratorWithPrefix([]byte{blockHeightIDPrefix})
defer it.Release()

batch := c.db.NewBatch()
var lastDeletedHeight uint64

for it.Next() {
key := it.Key()
height := extractBlockHeightFromKey(key)

// Nothing to delete after the threshold height
if height >= thresholdHeight {
break
}

// Skip if:
// Block is at genesis height (0)
if height == 0 {
continue
}

deleteBlkID, err := c.GetBlockIDAtHeight(ctx, height)
if err != nil {
return err
}

if err = errors.Join(
batch.Delete(prefixBlockKey(height)),
batch.Delete(prefixBlockIDHeightKey(deleteBlkID)),
batch.Delete(prefixBlockHeightIDKey(height)),
); err != nil {
return err
}
c.metrics.deletedBlocks.Inc()

// Keep track of the last height we deleted
lastDeletedHeight = height
}

if err := it.Error(); err != nil {
return fmt.Errorf("iterator error during cleanup: %w", err)
}

// Write all the deletions
if err := batch.Write(); err != nil {
return err
}

// Perform a single compaction at the end if we deleted anything
if lastDeletedHeight > 0 {
go func() {
start := time.Now()
if err := c.db.Compact([]byte{blockPrefix}, prefixBlockKey(lastDeletedHeight)); err != nil {
c.log.Error("failed to compact block store", zap.Error(err))
return
}
c.log.Info("compacted disk blocks", zap.Uint64("end", lastDeletedHeight), zap.Duration("t", time.Since(start)))
}()
}

return nil
}

func prefixBlockKey(height uint64) []byte {
k := make([]byte, 1+consts.Uint64Len)
k[0] = blockPrefix
Expand All @@ -203,3 +312,9 @@ func prefixBlockHeightIDKey(height uint64) []byte {
binary.BigEndian.PutUint64(k[1:], height)
return k
}

// extractBlockHeightFromKey extracts block height from the key.
// The key is expected to be in the format: [1-byte prefix][8-byte big-endian encoded uint64]
func extractBlockHeightFromKey(key []byte) uint64 {
return binary.BigEndian.Uint64(key[1:])
}
Loading