Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consortium-v2/snapshot: prune old, unused snapshots #467

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions cmd/ronin/dbcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
v2 "github.com/ethereum/go-ethereum/consensus/consortium/v2"
"github.com/ethereum/go-ethereum/console/prompt"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
Expand Down Expand Up @@ -69,6 +71,7 @@ Remove blockchain and state databases`,
dbDumpFreezerIndex,
dbImportCmd,
dbExportCmd,
dbPruneConsortiumSnapshotCmd,
},
}
dbInspectCmd = &cli.Command{
Expand Down Expand Up @@ -243,6 +246,20 @@ WARNING: This is a low-level operation which may cause database corruption!`,
},
Description: "Exports the specified chain data to an RLP encoded stream, optionally gzip-compressed.",
}
dbPruneConsortiumSnapshotCmd = &cli.Command{
Name: "prune-consortium-snapshot",
Usage: "Prune all snapshots except the latest ones",
Action: pruneSnapshot,
Category: "MISCELLANEOUS COMMANDS",
Flags: []cli.Flag{
utils.SnapshotKeepAfterPruningFlag,
utils.DataDirFlag,
},
Description: `
Prune all consortium snapshots except the latest ones. The number of snapshots to keep
can be specified via "--snapshot.keep-after-pruning" flag. The default value is 10.
`,
}
)

func removeDB(ctx *cli.Context) error {
Expand Down Expand Up @@ -695,3 +712,60 @@ func exportChaindata(ctx *cli.Context) error {
db := utils.MakeChainDatabase(ctx, stack, true)
return utils.ExportChaindata(ctx.Args().Get(1), kind, exporter(db), stop)
}

func pruneSnapshot(ctx *cli.Context) error {
keep := ctx.Int(utils.SnapshotKeepAfterPruningFlag.Name)
if keep < 0 {
log.Error("Invalid keep value", "keep", keep)
return errors.New("invalid keep value")
}
log.Info("Snapshot pruning", "latest snapshots keep: ", keep)
// Open the chain database
stack, _ := makeConfigNode(ctx)
defer stack.Close()

db := utils.MakeChainDatabase(ctx, stack, false)

// Get all snapshots (hash, block number) from the database
nSnapshots := 0
snapshots := make(map[common.Hash]uint64)
it := rawdb.GetSnapshotsIterator(db)
defer it.Release()
for it.Next() {
snap := new(v2.Snapshot)
if err := json.Unmarshal(it.Value(), snap); err != nil {
return err
}
snapshots[snap.Hash] = snap.Number
nSnapshots++
}
log.Info("Found all snapshots", "nSnapshots", nSnapshots)

// Sort the snapshots by block number
hashes := make([]common.Hash, 0, nSnapshots)
for hash := range snapshots {
hashes = append(hashes, hash)
}
sort.Slice(hashes, func(i, j int) bool {
return snapshots[hashes[i]] < snapshots[hashes[j]]
})

// Prune the snapshots
if nSnapshots < keep {
keep = nSnapshots
}
nSnapshotsPrune := nSnapshots - keep
batch := db.NewBatch()
for i := 0; i < nSnapshotsPrune; i++ {
if err := rawdb.DeleteSnapshotConsortium(batch, hashes[i]); err != nil {
log.Error("Failed to delete snapshot", "hash", hashes[i], "err", err)
return err
}
}
if err := batch.Write(); err != nil {
log.Error("Failed to write batch", "err", err)
return err
}
log.Info("Pruned snapshots", "snapshots", nSnapshotsPrune)
return nil
}
6 changes: 6 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,12 @@ var (
Usage: "List of mock stake amounts which are reflect 1:1 with mock.validators",
Category: flags.MockCategory,
}

SnapshotKeepAfterPruningFlag = &cli.IntFlag{
Name: "snapshot.keep-after-pruning",
Usage: "The number of lastest snapshots to keep after pruning (default 200 * 144 = 28800 ~ 1 day)",
Value: 200 * 144,
}
)

// MakeDataDir retrieves the currently requested data directory, terminating
Expand Down
7 changes: 7 additions & 0 deletions consensus/consortium/v2/consortium.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,9 @@ func (c *Consortium) snapshot(chain consensus.ChainHeaderReader, number uint64,
if err := snap.store(c.db); err != nil {
return nil, err
}
if err := snap.pruneSnapshotPeriodically(c.db, chain); err != nil {
return nil, err
}
log.Info("Stored checkpoint snapshot to disk", "number", number, "hash", hash)
figure.NewColorFigure("Welcome to DPOS", "", "green", true).Print()
break
Expand Down Expand Up @@ -782,6 +785,10 @@ func (c *Consortium) snapshot(chain consensus.ChainHeaderReader, number uint64,
if err = snap.store(c.db); err != nil {
return nil, err
}
// Prune the snapshot periodically
if err := snap.pruneSnapshotPeriodically(c.db, chain); err != nil {
return nil, err
}
log.Trace("Stored snapshot to disk", "number", snap.Number, "hash", snap.Hash)
}
log.Trace("Checking snapshot data", "number", snap.Number, "validators", snap.validators())
Expand Down
48 changes: 48 additions & 0 deletions consensus/consortium/v2/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,22 @@ import (
blsCommon "github.com/ethereum/go-ethereum/crypto/bls/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/hashicorp/golang-lru/arc/v2"
)

const (
blocksPerEpoch = 200
epochsPerPeriod = 144
)

var (
latestSnapshotsKeep = blocksPerEpoch * epochsPerPeriod * 5 // 5 days
snapshotsToBePruned = epochsPerPeriod * 2 // 2 days
pruningPeriod = blocksPerEpoch * epochsPerPeriod * 1 // every 1 day
)

// Snapshot is the state of the authorization validators at a given point in time.
type Snapshot struct {
// private fields are not json.Marshalled
Expand Down Expand Up @@ -113,6 +125,42 @@ func loadSnapshot(
return snap, nil
}

// snapshot pruning
// delete the nSnapshotsPrune oldest snapshots, keep the latestSnapshotsKeep snapshots
func (s *Snapshot) pruneSnapshot(db ethdb.Database, nSnapshotPrune int, chain consensus.ChainHeaderReader) error {
log.Info("Pruning snapshots at block", "block", s.Number, "nSnapshotPrune", nSnapshotPrune)
// Get block number to start pruning
curBlockNumber := s.Number
curBlockNumber -= curBlockNumber % uint64(blocksPerEpoch) // start of the current epoch
curBlockNumber -= uint64(latestSnapshotsKeep) // start of the oldest epoch to keep

// delete nSnapshotPrune snapshots starting from curBlockNumber to the older ones
batch := db.NewBatch()
for nSnapshotPrune > 0 {
nSnapshotPrune--
header := chain.GetHeaderByNumber(curBlockNumber)
if header == nil {
// no more snapshots to prune
break
}
curHash := header.Hash()
if err := rawdb.DeleteSnapshotConsortium(batch, curHash); err != nil {
return err
}
curBlockNumber -= uint64(blocksPerEpoch)
}
log.Info("Pruned snapshots done")
return batch.Write()
}

// periodically prune the snapshots at the start of each pruningPeriod
func (s *Snapshot) pruneSnapshotPeriodically(db ethdb.Database, chain consensus.ChainHeaderReader) error {
if s.Number%uint64(pruningPeriod) == 0 {
return s.pruneSnapshot(db, snapshotsToBePruned, chain)
}
return nil
}

// store inserts the snapshot into the database.
func (s *Snapshot) store(db ethdb.Database) error {
blob, err := json.Marshal(s)
Expand Down
6 changes: 6 additions & 0 deletions core/rawdb/accessors_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,9 @@ func WriteSnapshotConsortium(db ethdb.KeyValueWriter, hash common.Hash, snapshot
func DeleteSnapshotConsortium(db ethdb.KeyValueWriter, hash common.Hash) error {
return db.Delete(snapshotConsortiumKey(hash))
}

// GetSnapshotsIterator returns an iterator for walking the entire snapshot
func GetSnapshotsIterator(db ethdb.Database) ethdb.Iterator {
it := db.NewIterator(snapshotConsortiumPrefix, nil)
return it
}
Loading