Skip to content

Add hook to force committing state to disk #280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: snap_sync
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions arbitrum/handler_p2p.go
Original file line number Diff line number Diff line change
@@ -39,10 +39,10 @@ import (
)

type SyncHelper interface {
LastConfirmed() (*types.Header, uint64, error)
LastConfirmed() (*types.Header, uint64, uint64, error)
LastCheckpoint() (*types.Header, error)
CheckpointSupported(*types.Header) (bool, error)
ValidateConfirmed(*types.Header, uint64) (bool, error)
ValidateConfirmed(*types.Header, uint64, uint64) (bool, error)
}

type Peer struct {
@@ -271,7 +271,7 @@ func (h *arbHandler) PeerInfo(id enode.ID) interface{} {
return nil
}

func (h *arbHandler) HandleLastConfirmed(peer *arb.Peer, confirmed *types.Header, node uint64) {
func (h *arbHandler) HandleLastConfirmed(peer *arb.Peer, confirmed *types.Header, l1BlockNumber uint64, node uint64) {
protoHandler := (*protocolHandler)(h)
validated := false
valid := false
@@ -284,7 +284,7 @@ func (h *arbHandler) HandleLastConfirmed(peer *arb.Peer, confirmed *types.Header
}
if !validated {
var err error
valid, err = h.helper.ValidateConfirmed(confirmed, node)
valid, err = h.helper.ValidateConfirmed(confirmed, l1BlockNumber, node)
if err != nil {
log.Error("error in validate confirmed", "id", peer.ID(), "err", err)
return
@@ -343,7 +343,7 @@ func (h *arbHandler) HandleCheckpoint(peer *arb.Peer, checkpoint *types.Header,
protoHandler.advanceCheckpoint(checkpoint)
}

func (h *arbHandler) LastConfirmed() (*types.Header, uint64, error) {
func (h *arbHandler) LastConfirmed() (*types.Header, uint64, uint64, error) {
return h.helper.LastConfirmed()
}

6 changes: 3 additions & 3 deletions arbitrum/sync_test.go
Original file line number Diff line number Diff line change
@@ -61,8 +61,8 @@ type dummySyncHelper struct {
checkpoint *types.Header
}

func (d *dummySyncHelper) LastConfirmed() (*types.Header, uint64, error) {
return d.confirmed, 0, nil
func (d *dummySyncHelper) LastConfirmed() (*types.Header, uint64, uint64, error) {
return d.confirmed, 0, 0, nil
}

func (d *dummySyncHelper) LastCheckpoint() (*types.Header, error) {
@@ -76,7 +76,7 @@ func (d *dummySyncHelper) CheckpointSupported(*types.Header) (bool, error) {
return true, nil
}

func (d *dummySyncHelper) ValidateConfirmed(header *types.Header, node uint64) (bool, error) {
func (d *dummySyncHelper) ValidateConfirmed(header *types.Header, l1BlockNumber uint64, node uint64) (bool, error) {
if d.confirmed == nil {
return true, nil
}
35 changes: 33 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
@@ -284,11 +284,26 @@ type BlockChain struct {

numberOfBlocksToSkipStateSaving uint32
amountOfGasInBlocksToSkipStateSaving uint64
forceTriedbCommitHook ForceTriedbCommitHook
processingSinceLastForceCommit time.Duration
gasSinceLastForceCommit uint64
}

type ForceTriedbCommitHook func(*types.Block, time.Duration, uint64) bool

type trieGcEntry struct {
Root common.Hash
Timestamp uint64
GasUsed uint64
}

func NewArbBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, genesis *Genesis, overrides *ChainOverrides, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(header *types.Header) bool, txLookupLimit *uint64, forceTriedbCommitHook ForceTriedbCommitHook) (*BlockChain, error) {
bc, err := NewBlockChain(db, cacheConfig, chainConfig, genesis, overrides, engine, vmConfig, shouldPreserve, txLookupLimit)
if err != nil {
return nil, err
}
bc.forceTriedbCommitHook = forceTriedbCommitHook
return bc, nil
}

// NewBlockChain returns a fully initialised block chain using information
@@ -1493,7 +1508,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.

// Full node or sparse archive node that's not keeping all states, do proper garbage collection
bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(trieGcEntry{root, block.Header().Time}, -int64(block.NumberU64()))
bc.triegc.Push(trieGcEntry{root, block.Header().Time, block.GasUsed()}, -int64(block.NumberU64()))

blockLimit := int64(block.NumberU64()) - int64(bc.cacheConfig.TriesInMemory) // only cleared if below that
timeLimit := time.Now().Unix() - int64(bc.cacheConfig.TrieRetention.Seconds()) // only cleared if less than that
@@ -1509,6 +1524,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
var prevEntry *trieGcEntry
var prevNum uint64
var forceCommit bool
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
triegcEntry, number := bc.triegc.Pop()
@@ -1517,15 +1533,24 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
break
}
if prevEntry != nil {
bc.gasSinceLastForceCommit += block.GasUsed()
if bc.forceTriedbCommitHook != nil && bc.forceTriedbCommitHook(block, bc.gcproc+bc.processingSinceLastForceCommit, bc.gasSinceLastForceCommit) {
forceCommit = true
break
}
bc.triedb.Dereference(prevEntry.Root)
}
prevEntry = &triegcEntry
prevNum = uint64(-number)
}
if prevEntry != nil && bc.forceTriedbCommitHook != nil && !forceCommit {
bc.gasSinceLastForceCommit += block.GasUsed()
forceCommit = bc.forceTriedbCommitHook(block, bc.gcproc+bc.processingSinceLastForceCommit, bc.gasSinceLastForceCommit)
}
flushInterval := time.Duration(bc.flushInterval.Load())
// If we exceeded out time allowance, flush an entire trie to disk
// In case of archive node that skips some trie commits we don't flush tries here
if bc.gcproc > flushInterval && prevEntry != nil && !archiveNode {
if prevEntry != nil && ((bc.gcproc > flushInterval && !archiveNode) || forceCommit) {
// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(prevNum)
@@ -1540,6 +1565,12 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// Flush an entire trie and restart the counters
bc.triedb.Commit(header.Root, true)
bc.lastWrite = prevNum
if !forceCommit {
bc.processingSinceLastForceCommit += bc.gcproc
} else {
bc.processingSinceLastForceCommit = 0
bc.gasSinceLastForceCommit = 0
}
bc.gcproc = 0
}
}
9 changes: 5 additions & 4 deletions eth/protocols/arb/handler.go
Original file line number Diff line number Diff line change
@@ -81,13 +81,14 @@ func HandleMessage(backend Backend, peer *Peer) error {
}
switch {
case msg.Code == GetLastConfirmedMsg:
confirmed, node, err := backend.LastConfirmed()
confirmed, l1BlockNumber, node, err := backend.LastConfirmed()
if err != nil || confirmed == nil {
return err
}
response := LastConfirmedMsgPacket{
Header: confirmed,
Node: node,
Header: confirmed,
L1BlockNumber: l1BlockNumber,
Node: node,
}
return p2p.Send(peer.rw, LastConfirmedMsg, &response)
case msg.Code == LastConfirmedMsg:
@@ -99,7 +100,7 @@ func HandleMessage(backend Backend, peer *Peer) error {
if incoming.Header == nil {
return nil
}
backend.HandleLastConfirmed(peer, incoming.Header, incoming.Node)
backend.HandleLastConfirmed(peer, incoming.Header, incoming.L1BlockNumber, incoming.Node)
return nil
case msg.Code == GetLastCheckpointMsg:
checkpoint, err := backend.LastCheckpoint()
9 changes: 5 additions & 4 deletions eth/protocols/arb/protocol.go
Original file line number Diff line number Diff line change
@@ -34,8 +34,9 @@ const (
)

type LastConfirmedMsgPacket struct {
Header *types.Header
Node uint64
Header *types.Header
L1BlockNumber uint64
Node uint64
}

type CheckpointMsgPacket struct {
@@ -62,9 +63,9 @@ type Handler func(peer *Peer) error
// callback methods to invoke on remote deliveries.
type Backend interface {
PeerInfo(id enode.ID) interface{}
HandleLastConfirmed(peer *Peer, confirmed *types.Header, node uint64)
HandleLastConfirmed(peer *Peer, confirmed *types.Header, l1BlockNumber uint64, node uint64)
HandleCheckpoint(peer *Peer, header *types.Header, supported bool)
LastConfirmed() (*types.Header, uint64, error)
LastConfirmed() (*types.Header, uint64, uint64, error)
LastCheckpoint() (*types.Header, error)
CheckpointSupported(*types.Header) (bool, error)
// RunPeer is invoked when a peer joins on the `eth` protocol. The handler