Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/mithril/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func init() {
Run.Flags().BoolVar(&sbpf.UsePool, "use-pool", true, "Disable to allocate fresh slices")
Run.Flags().IntVar(&accountsdb.StoreAccountsWorkers, "store-accounts-workers", 128, "Number of workers to write account updates")
Run.Flags().BoolVar(&accountsdb.StoreAsync, "store-async", false, "Store accounts asynchronously")
Run.Flags().BoolVar(&replay.EnablePrefetcher, "enable-prefetcher", false, "Enable accounts prefetch")

// [tuning.pprof] section flags
Run.Flags().Int64Var(&pprofPort, "pprof-port", -1, "Port to serve HTTP pprof endpoint")
Expand Down Expand Up @@ -525,6 +526,7 @@ func initConfigAndBindFlags(cmd *cobra.Command) error {
sbpf.UsePool = getBool("use-pool", "tuning.use_pool")
accountsdb.StoreAccountsWorkers = getInt("store-accounts-workers", "tuning.store_accounts_workers")
accountsdb.StoreAsync = getBool("store-async", "tuning.store_async")
replay.EnablePrefetcher = getBool("enable-prefetcher", "tuning.enable_prefetcher")

return nil
}
Expand Down
77 changes: 64 additions & 13 deletions pkg/accountsdb/accountsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ type AccountsDb struct {
CommonAcctsCache otter.Cache[solana.PublicKey, *accounts.Account]
ProgramCache otter.Cache[solana.PublicKey, *ProgramCacheEntry]

// A list of store requests. They are added to the back as they arrive and
// removed from the front as they are persisted.
// A list of storeRequest | prefetchRequest. They are added to the back as
// they arrive and removed from the front as they are persisted. AccountsDb
// readers should check storeRequests for the latest account states.
// prefetchRequests don't modify disk state, but the store worker processes
// both types of requests serially to avoid difficulties in keeping disk
// and cache in sync.
inProgressStoreRequestsMu sync.Mutex
inProgressStoreRequests *list.List
storeRequestChan chan *list.Element
Expand All @@ -55,6 +59,10 @@ type storeRequest struct {
cb func()
}

type prefetchRequest struct {
keys map[solana.PublicKey]struct{}
}

// silentLogger implements pebble.Logger but discards all messages.
// This suppresses verbose WAL recovery messages on startup.
type silentLogger struct{}
Expand Down Expand Up @@ -115,7 +123,7 @@ func OpenDb(accountsDbDir string) (*AccountsDb, error) {
mlog.Log.Infof("StoreAsync=%t", StoreAsync)
if StoreAsync {
accountsDb.inProgressStoreRequests = list.New()
accountsDb.storeRequestChan = make(chan *list.Element)
accountsDb.storeRequestChan = make(chan *list.Element, 1)
accountsDb.storeWorkerDone = make(chan struct{})
go accountsDb.storeWorker()
}
Expand Down Expand Up @@ -206,20 +214,25 @@ func (accountsDb *AccountsDb) GetAccount(slot uint64, pubkey solana.PublicKey) (
return accts[0], nil
}
}
return accountsDb.getStoredAccount(slot, pubkey)
return accountsDb.getStoredAccount(pubkey)
}

func (accountsDb *AccountsDb) getStoredAccount(slot uint64, pubkey solana.PublicKey) (*accounts.Account, error) {
func (accountsDb *AccountsDb) getStoredAccount(pubkey solana.PublicKey) (*accounts.Account, error) {
r := trace.StartRegion(context.Background(), "GetStoredAccountCache")
cachedAcct, hasAcct := accountsDb.VoteAcctCache.Get(pubkey)
if hasAcct {
r.End()
return cachedAcct, nil
}

cachedAcct, hasAcct = accountsDb.CommonAcctsCache.Get(pubkey)
if hasAcct {
r.End()
return cachedAcct, nil
}
r.End()

defer trace.StartRegion(context.Background(), "GetStoredAccountDisk").End()
acctIdxEntryBytes, c, err := accountsDb.Index.Get(pubkey[:])
if err != nil {
//mlog.Log.Debugf("no account found in accountsdb for pubkey %s: %s", pubkey, err)
Expand Down Expand Up @@ -276,12 +289,16 @@ func (accountsDb *AccountsDb) getStoredAccount(slot uint64, pubkey solana.Public
// Returns a slice of the same length as the input with results matching indexes, nil if not found.
// Returns clones to avoid data races with the store worker.
func (accountsDb *AccountsDb) getStoreInProgressAccounts(pks []solana.PublicKey) []*accounts.Account {
defer trace.StartRegion(context.Background(), "getStoreInProgressAccounts").End()
out := make([]*accounts.Account, len(pks))
accountsDb.inProgressStoreRequestsMu.Lock()
defer accountsDb.inProgressStoreRequestsMu.Unlock()
// Start with newest first.
for e := accountsDb.inProgressStoreRequests.Back(); e != nil; e = e.Prev() {
sr := e.Value.(storeRequest)
sr, ok := e.Value.(storeRequest)
if !ok {
continue
}
for i := range len(pks) {
if out[i] != nil {
continue // Already found.
Expand Down Expand Up @@ -357,13 +374,27 @@ func (accountsDb *AccountsDb) storeAccountsSync(accts []*accounts.Account, slot
func (accountsDb *AccountsDb) storeWorker() {
defer close(accountsDb.storeWorkerDone)
for elt := range accountsDb.storeRequestChan {
sr := elt.Value.(storeRequest)
accountsDb.storeAccountsSync(sr.accts, sr.slot)
accountsDb.inProgressStoreRequestsMu.Lock()
accountsDb.inProgressStoreRequests.Remove(elt)
accountsDb.inProgressStoreRequestsMu.Unlock()
if sr.cb != nil {
sr.cb()
switch v := elt.Value.(type) {
case storeRequest:
accountsDb.storeAccountsSync(v.accts, v.slot)
accountsDb.inProgressStoreRequestsMu.Lock()
accountsDb.inProgressStoreRequests.Remove(elt)
accountsDb.inProgressStoreRequestsMu.Unlock()
if v.cb != nil {
v.cb()
}

case prefetchRequest:
accountsDb.prefetchSync(v.keys)
accountsDb.inProgressStoreRequestsMu.Lock()
accountsDb.inProgressStoreRequests.Remove(elt)
accountsDb.inProgressStoreRequestsMu.Unlock()

default:
mlog.Log.Errorf("storeWorker: unknown request type %T", elt.Value)
accountsDb.inProgressStoreRequestsMu.Lock()
accountsDb.inProgressStoreRequests.Remove(elt)
accountsDb.inProgressStoreRequestsMu.Unlock()
}
}
}
Expand Down Expand Up @@ -602,6 +633,26 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc
}
}

func (accountsDb *AccountsDb) Prefetch(keys map[solana.PublicKey]struct{}) {
if StoreAsync {
accountsDb.inProgressStoreRequestsMu.Lock()
element := accountsDb.inProgressStoreRequests.PushBack(prefetchRequest{keys})
accountsDb.inProgressStoreRequestsMu.Unlock()
accountsDb.storeRequestChan <- element
} else {
accountsDb.prefetchSync(keys)
}
}

func (accountsDb *AccountsDb) prefetchSync(keys map[solana.PublicKey]struct{}) {
defer trace.StartRegion(context.Background(), "Prefetch").End()
keySlice := make([]solana.PublicKey, 0, len(keys))
for k := range keys {
keySlice = append(keySlice, k)
}
accountsDb.GetAccountsBatch(context.Background(), keySlice)
}

func (accountsDb *AccountsDb) GetBankHashForSlot(slot uint64) ([]byte, error) {
var slotBytes [8]byte
binary.LittleEndian.PutUint64(slotBytes[:], slot)
Expand Down
4 changes: 2 additions & 2 deletions pkg/accountsdb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

var systemProgramAddr [32]byte

func (db *AccountsDb) GetAccountsBatch(ctx context.Context, slot uint64, pks []solana.PublicKey) ([]*accounts.Account, error) {
func (db *AccountsDb) GetAccountsBatch(ctx context.Context, pks []solana.PublicKey) ([]*accounts.Account, error) {
n := len(pks)
if n == 0 {
return nil, nil
Expand Down Expand Up @@ -49,7 +49,7 @@ func (db *AccountsDb) GetAccountsBatch(ctx context.Context, slot uint64, pks []s
default:
}

acct, err := db.getStoredAccount(slot, key)
acct, err := db.getStoredAccount(key)
if err != nil && err != ErrNoAccount {
select {
case errCh <- err:
Expand Down
22 changes: 22 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,25 @@ type BlockRewardsInfo struct {
Lamports uint64
PostBalance uint64
}

func (b *Block) UniqueAccounts() map[solana.PublicKey]struct{} {
numPubkeys := 0
for _, tx := range b.Transactions {
numPubkeys += len(tx.Message.AccountKeys)
}

numPubkeys += len(b.UpdatedAccts)

pubkeyMap := make(map[solana.PublicKey]struct{}, numPubkeys)

for _, tx := range b.Transactions {
for _, pk := range tx.Message.AccountKeys {
pubkeyMap[pk] = struct{}{}
}
}

for _, pk := range b.UpdatedAccts {
pubkeyMap[pk] = struct{}{}
}
return pubkeyMap
}
105 changes: 52 additions & 53 deletions pkg/blockstream/block_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync/atomic"
"time"

"github.com/Overclock-Validator/mithril/pkg/block"
b "github.com/Overclock-Validator/mithril/pkg/block"
"github.com/Overclock-Validator/mithril/pkg/mlog"
"github.com/Overclock-Validator/mithril/pkg/rpcclient"
Expand All @@ -30,9 +29,9 @@ const (
type BlockSourceOpts struct {
RpcClient *rpcclient.RpcClient // Primary RPC for block fetching (getBlock)
SourceType BlockSourceType
StartSlot uint64
EndSlot uint64
BlockDir string
StartSlot uint64
EndSlot uint64
BlockDir string

// Backup RPC endpoints for failover (optional)
// These are tried in order if the primary fails with hard connectivity errors
Expand Down Expand Up @@ -161,35 +160,35 @@ type BlockSourceStats struct {
}

type BlockSource struct {
rpcClients []*rpcclient.RpcClient // All RPC clients for block fetching (index 0 = primary)
streamChan chan *b.Block
startSlot uint64
endSlot uint64
currentSlot uint64
blockDir string
sourceType BlockSourceType
rpcClients []*rpcclient.RpcClient // All RPC clients for block fetching (index 0 = primary)
streamChan chan *b.Block
startSlot uint64
endSlot uint64
currentSlot uint64
blockDir string
sourceType BlockSourceType

// RPC failover tracking
activeRpcIdx atomic.Int32 // Currently active RPC index (0 = primary)
slotsSinceFailover atomic.Uint64 // Slots emitted since failover (for retry timing)
failoverCount atomic.Uint64 // Total failovers (for stats)
hardErrCount atomic.Uint64 // Consecutive hard connectivity errors (reset on success)
lastHardErrTime atomic.Int64 // Unix timestamp of last hard error (for time-windowing)
activeRpcIdx atomic.Int32 // Currently active RPC index (0 = primary)
slotsSinceFailover atomic.Uint64 // Slots emitted since failover (for retry timing)
failoverCount atomic.Uint64 // Total failovers (for stats)
hardErrCount atomic.Uint64 // Consecutive hard connectivity errors (reset on success)
lastHardErrTime atomic.Int64 // Unix timestamp of last hard error (for time-windowing)

// Rate limiting
rateLimiter *rate.Limiter
maxInflight int

// Tip tracking
confirmedTip atomic.Uint64
processedTip atomic.Uint64 // Processed commitment tip (super tip)
tipAtSlot atomic.Uint64 // What slot we had executed when tip was measured
lastExecutedSlot atomic.Uint64 // Last slot fully executed by replay (set by SetLastExecutedSlot)
tipSafetyMargin uint64
tipPollInterval time.Duration
lastTipUpdate atomic.Int64 // Unix timestamp of last successful tip poll
tipPollFailures atomic.Uint64 // Consecutive tip poll failures
totalTipPollFails atomic.Uint64 // Total tip poll failures (for stats)
confirmedTip atomic.Uint64
processedTip atomic.Uint64 // Processed commitment tip (super tip)
tipAtSlot atomic.Uint64 // What slot we had executed when tip was measured
lastExecutedSlot atomic.Uint64 // Last slot fully executed by replay (set by SetLastExecutedSlot)
tipSafetyMargin uint64
tipPollInterval time.Duration
lastTipUpdate atomic.Int64 // Unix timestamp of last successful tip poll
tipPollFailures atomic.Uint64 // Consecutive tip poll failures
totalTipPollFails atomic.Uint64 // Total tip poll failures (for stats)

// Reorder buffer
reorderMu sync.Mutex
Expand Down Expand Up @@ -219,11 +218,11 @@ type BlockSource struct {
stallError atomic.Bool // Set when stall timeout triggers

// Stall diagnostics
waitingSlotErrorsMu sync.Mutex
waitingSlotErrors map[uint64]*slotErrorInfo // Per-slot error tracking
lastStallHeartbeat atomic.Int64 // Unix timestamp of last stall heartbeat log
lastFailoverTime atomic.Int64 // Unix timestamp of last RPC failover
lastPriorityBlockedLog atomic.Int64 // Unix timestamp of last "priority slot blocked" log
waitingSlotErrorsMu sync.Mutex
waitingSlotErrors map[uint64]*slotErrorInfo // Per-slot error tracking
lastStallHeartbeat atomic.Int64 // Unix timestamp of last stall heartbeat log
lastFailoverTime atomic.Int64 // Unix timestamp of last RPC failover
lastPriorityBlockedLog atomic.Int64 // Unix timestamp of last "priority slot blocked" log

// Near-tip mode tracking
isNearTip atomic.Bool // True when close to confirmed tip
Expand Down Expand Up @@ -333,23 +332,23 @@ func NewBlockSource(opts *BlockSourceOpts) *BlockSource {
}

bs := &BlockSource{
rpcClients: rpcClients,
streamChan: make(chan *b.Block, streamChanBuffer),
startSlot: opts.StartSlot,
endSlot: opts.EndSlot,
currentSlot: opts.StartSlot,
blockDir: opts.BlockDir,
sourceType: opts.SourceType,
rateLimiter: rate.NewLimiter(rate.Limit(maxRPS), maxRPS),
maxInflight: maxInflight,
tipSafetyMargin: tipSafetyMargin,
tipPollInterval: time.Duration(tipPollMs) * time.Millisecond,
reorderBuffer: make(map[uint64]*b.Block),
skippedSlots: make(map[uint64]bool),
nextSlotToSend: opts.StartSlot,
maxPending: defaultMaxPending,
slotState: make(map[uint64]slotStatus),
inflightStart: make(map[uint64]time.Time),
rpcClients: rpcClients,
streamChan: make(chan *b.Block, streamChanBuffer),
startSlot: opts.StartSlot,
endSlot: opts.EndSlot,
currentSlot: opts.StartSlot,
blockDir: opts.BlockDir,
sourceType: opts.SourceType,
rateLimiter: rate.NewLimiter(rate.Limit(maxRPS), maxRPS),
maxInflight: maxInflight,
tipSafetyMargin: tipSafetyMargin,
tipPollInterval: time.Duration(tipPollMs) * time.Millisecond,
reorderBuffer: make(map[uint64]*b.Block),
skippedSlots: make(map[uint64]bool),
nextSlotToSend: opts.StartSlot,
maxPending: defaultMaxPending,
slotState: make(map[uint64]slotStatus),
inflightStart: make(map[uint64]time.Time),
workQueue: make(chan uint64, maxInflight*2),
resultQueue: make(chan fetchResult, maxInflight*2),
stopChan: make(chan struct{}),
Expand Down Expand Up @@ -653,7 +652,7 @@ func (bs *BlockSource) logStallDiagnostics(prefix string) {
mlog.Log.Errorf("=== End %s ===", prefix)
}

func (bs *BlockSource) tryGetBlockFromFile(slot uint64) (*block.Block, error) {
func (bs *BlockSource) tryGetBlockFromFile(slot uint64) (*b.Block, error) {
if bs.blockDir == "" {
return nil, fmt.Errorf("no block directory specified")
}
Expand All @@ -664,7 +663,7 @@ func (bs *BlockSource) tryGetBlockFromFile(slot uint64) (*block.Block, error) {
}

decoder := json.NewDecoder(file)
out := &block.Block{}
out := &b.Block{}

err = decoder.Decode(out)
if err != nil {
Expand Down Expand Up @@ -785,7 +784,7 @@ func (bs *BlockSource) fetchBlockOnce(slot uint64, rpcIdx int32) (*b.Block, erro
return nil, err
}

return block.FromBlockResult(blockResult, slot, rpc), nil
return b.FromBlockResult(blockResult, slot, rpc), nil
}

// pollTip periodically updates the confirmed tip by querying all configured RPCs
Expand Down Expand Up @@ -1193,13 +1192,13 @@ func (bs *BlockSource) emitOrderedBlocks() {
if result.skipped {
bs.stats.FetchSkipped.Add(1)
bs.skippedSlots[result.slot] = true
bs.hardErrCount.Store(0) // Reset on progress
bs.hardErrCount.Store(0) // Reset on progress
bs.clearSlotErrors(result.slot) // Clear stall diagnostics for this slot
} else if result.block != nil {
// Success! This takes priority over any pending error results.
bs.stats.FetchSuccesses.Add(1)
bs.reorderBuffer[result.slot] = result.block
bs.hardErrCount.Store(0) // Reset error count on success
bs.hardErrCount.Store(0) // Reset error count on success
bs.clearSlotErrors(result.slot) // Clear stall diagnostics for this slot
// Track max buffered slot
if result.slot > bs.stats.MaxBufferedSlot.Load() {
Expand Down Expand Up @@ -1671,7 +1670,7 @@ func (bs *BlockSource) fetchAndParseBlockSequential(slot uint64) (*b.Block, erro
return nil, fmt.Errorf("error fetching block: %w", err)
}
}
blk = block.FromBlockResult(blockResult, slot, rpc)
blk = b.FromBlockResult(blockResult, slot, rpc)
}
} else if bs.sourceType == BlockSourceLightbringer {
// NOTE: BlockSourceLightbringer is TEMPORARILY NON-FUNCTIONAL.
Expand Down
Loading