diff --git a/cmd/mithril/node/node.go b/cmd/mithril/node/node.go index edb7af08..150ac274 100644 --- a/cmd/mithril/node/node.go +++ b/cmd/mithril/node/node.go @@ -187,6 +187,7 @@ func init() { Run.Flags().BoolVar(&sbpf.UsePool, "use-pool", true, "Disable to allocate fresh slices") Run.Flags().IntVar(&accountsdb.StoreAccountsWorkers, "store-accounts-workers", 128, "Number of workers to write account updates") Run.Flags().BoolVar(&accountsdb.StoreAsync, "store-async", false, "Store accounts asynchronously") + Run.Flags().BoolVar(&replay.EnablePrefetcher, "enable-prefetcher", false, "Enable accounts prefetch") // [tuning.pprof] section flags Run.Flags().Int64Var(&pprofPort, "pprof-port", -1, "Port to serve HTTP pprof endpoint") @@ -525,6 +526,7 @@ func initConfigAndBindFlags(cmd *cobra.Command) error { sbpf.UsePool = getBool("use-pool", "tuning.use_pool") accountsdb.StoreAccountsWorkers = getInt("store-accounts-workers", "tuning.store_accounts_workers") accountsdb.StoreAsync = getBool("store-async", "tuning.store_async") + replay.EnablePrefetcher = getBool("enable-prefetcher", "tuning.enable_prefetcher") return nil } diff --git a/pkg/accountsdb/accountsdb.go b/pkg/accountsdb/accountsdb.go index 1b578b5d..3eacb915 100644 --- a/pkg/accountsdb/accountsdb.go +++ b/pkg/accountsdb/accountsdb.go @@ -34,8 +34,12 @@ type AccountsDb struct { CommonAcctsCache otter.Cache[solana.PublicKey, *accounts.Account] ProgramCache otter.Cache[solana.PublicKey, *ProgramCacheEntry] - // A list of store requests. They are added to the back as they arrive and - // removed from the front as they are persisted. + // A list of storeRequest | prefetchRequest. They are added to the back as + // they arrive and removed from the front as they are persisted. AccountsDb + // readers should check storeRequests for the latest account states. + // prefetchRequests don't modify disk state, but the store worker processes + // both types of requests serially to avoid difficulties in keeping disk + // and cache in sync. inProgressStoreRequestsMu sync.Mutex inProgressStoreRequests *list.List storeRequestChan chan *list.Element @@ -55,6 +59,10 @@ type storeRequest struct { cb func() } +type prefetchRequest struct { + keys map[solana.PublicKey]struct{} +} + // silentLogger implements pebble.Logger but discards all messages. // This suppresses verbose WAL recovery messages on startup. type silentLogger struct{} @@ -115,7 +123,7 @@ func OpenDb(accountsDbDir string) (*AccountsDb, error) { mlog.Log.Infof("StoreAsync=%t", StoreAsync) if StoreAsync { accountsDb.inProgressStoreRequests = list.New() - accountsDb.storeRequestChan = make(chan *list.Element) + accountsDb.storeRequestChan = make(chan *list.Element, 1) accountsDb.storeWorkerDone = make(chan struct{}) go accountsDb.storeWorker() } @@ -206,20 +214,25 @@ func (accountsDb *AccountsDb) GetAccount(slot uint64, pubkey solana.PublicKey) ( return accts[0], nil } } - return accountsDb.getStoredAccount(slot, pubkey) + return accountsDb.getStoredAccount(pubkey) } -func (accountsDb *AccountsDb) getStoredAccount(slot uint64, pubkey solana.PublicKey) (*accounts.Account, error) { +func (accountsDb *AccountsDb) getStoredAccount(pubkey solana.PublicKey) (*accounts.Account, error) { + r := trace.StartRegion(context.Background(), "GetStoredAccountCache") cachedAcct, hasAcct := accountsDb.VoteAcctCache.Get(pubkey) if hasAcct { + r.End() return cachedAcct, nil } cachedAcct, hasAcct = accountsDb.CommonAcctsCache.Get(pubkey) if hasAcct { + r.End() return cachedAcct, nil } + r.End() + defer trace.StartRegion(context.Background(), "GetStoredAccountDisk").End() acctIdxEntryBytes, c, err := accountsDb.Index.Get(pubkey[:]) if err != nil { //mlog.Log.Debugf("no account found in accountsdb for pubkey %s: %s", pubkey, err) @@ -276,12 +289,16 @@ func (accountsDb *AccountsDb) getStoredAccount(slot uint64, pubkey solana.Public // Returns a slice of the same length as the input with results matching indexes, nil if not found. // Returns clones to avoid data races with the store worker. func (accountsDb *AccountsDb) getStoreInProgressAccounts(pks []solana.PublicKey) []*accounts.Account { + defer trace.StartRegion(context.Background(), "getStoreInProgressAccounts").End() out := make([]*accounts.Account, len(pks)) accountsDb.inProgressStoreRequestsMu.Lock() defer accountsDb.inProgressStoreRequestsMu.Unlock() // Start with newest first. for e := accountsDb.inProgressStoreRequests.Back(); e != nil; e = e.Prev() { - sr := e.Value.(storeRequest) + sr, ok := e.Value.(storeRequest) + if !ok { + continue + } for i := range len(pks) { if out[i] != nil { continue // Already found. @@ -357,13 +374,27 @@ func (accountsDb *AccountsDb) storeAccountsSync(accts []*accounts.Account, slot func (accountsDb *AccountsDb) storeWorker() { defer close(accountsDb.storeWorkerDone) for elt := range accountsDb.storeRequestChan { - sr := elt.Value.(storeRequest) - accountsDb.storeAccountsSync(sr.accts, sr.slot) - accountsDb.inProgressStoreRequestsMu.Lock() - accountsDb.inProgressStoreRequests.Remove(elt) - accountsDb.inProgressStoreRequestsMu.Unlock() - if sr.cb != nil { - sr.cb() + switch v := elt.Value.(type) { + case storeRequest: + accountsDb.storeAccountsSync(v.accts, v.slot) + accountsDb.inProgressStoreRequestsMu.Lock() + accountsDb.inProgressStoreRequests.Remove(elt) + accountsDb.inProgressStoreRequestsMu.Unlock() + if v.cb != nil { + v.cb() + } + + case prefetchRequest: + accountsDb.prefetchSync(v.keys) + accountsDb.inProgressStoreRequestsMu.Lock() + accountsDb.inProgressStoreRequests.Remove(elt) + accountsDb.inProgressStoreRequestsMu.Unlock() + + default: + mlog.Log.Errorf("storeWorker: unknown request type %T", elt.Value) + accountsDb.inProgressStoreRequestsMu.Lock() + accountsDb.inProgressStoreRequests.Remove(elt) + accountsDb.inProgressStoreRequestsMu.Unlock() } } } @@ -602,6 +633,26 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc } } +func (accountsDb *AccountsDb) Prefetch(keys map[solana.PublicKey]struct{}) { + if StoreAsync { + accountsDb.inProgressStoreRequestsMu.Lock() + element := accountsDb.inProgressStoreRequests.PushBack(prefetchRequest{keys}) + accountsDb.inProgressStoreRequestsMu.Unlock() + accountsDb.storeRequestChan <- element + } else { + accountsDb.prefetchSync(keys) + } +} + +func (accountsDb *AccountsDb) prefetchSync(keys map[solana.PublicKey]struct{}) { + defer trace.StartRegion(context.Background(), "Prefetch").End() + keySlice := make([]solana.PublicKey, 0, len(keys)) + for k := range keys { + keySlice = append(keySlice, k) + } + accountsDb.GetAccountsBatch(context.Background(), keySlice) +} + func (accountsDb *AccountsDb) GetBankHashForSlot(slot uint64) ([]byte, error) { var slotBytes [8]byte binary.LittleEndian.PutUint64(slotBytes[:], slot) diff --git a/pkg/accountsdb/batch.go b/pkg/accountsdb/batch.go index bd1aae5b..5b8ba24b 100644 --- a/pkg/accountsdb/batch.go +++ b/pkg/accountsdb/batch.go @@ -12,7 +12,7 @@ import ( var systemProgramAddr [32]byte -func (db *AccountsDb) GetAccountsBatch(ctx context.Context, slot uint64, pks []solana.PublicKey) ([]*accounts.Account, error) { +func (db *AccountsDb) GetAccountsBatch(ctx context.Context, pks []solana.PublicKey) ([]*accounts.Account, error) { n := len(pks) if n == 0 { return nil, nil @@ -49,7 +49,7 @@ func (db *AccountsDb) GetAccountsBatch(ctx context.Context, slot uint64, pks []s default: } - acct, err := db.getStoredAccount(slot, key) + acct, err := db.getStoredAccount(key) if err != nil && err != ErrNoAccount { select { case errCh <- err: diff --git a/pkg/block/block.go b/pkg/block/block.go index d21329ad..4706aa0c 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -69,3 +69,25 @@ type BlockRewardsInfo struct { Lamports uint64 PostBalance uint64 } + +func (b *Block) UniqueAccounts() map[solana.PublicKey]struct{} { + numPubkeys := 0 + for _, tx := range b.Transactions { + numPubkeys += len(tx.Message.AccountKeys) + } + + numPubkeys += len(b.UpdatedAccts) + + pubkeyMap := make(map[solana.PublicKey]struct{}, numPubkeys) + + for _, tx := range b.Transactions { + for _, pk := range tx.Message.AccountKeys { + pubkeyMap[pk] = struct{}{} + } + } + + for _, pk := range b.UpdatedAccts { + pubkeyMap[pk] = struct{}{} + } + return pubkeyMap +} diff --git a/pkg/blockstream/block_source.go b/pkg/blockstream/block_source.go index 6280b3d6..3c8340c9 100644 --- a/pkg/blockstream/block_source.go +++ b/pkg/blockstream/block_source.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "time" - "github.com/Overclock-Validator/mithril/pkg/block" b "github.com/Overclock-Validator/mithril/pkg/block" "github.com/Overclock-Validator/mithril/pkg/mlog" "github.com/Overclock-Validator/mithril/pkg/rpcclient" @@ -30,9 +29,9 @@ const ( type BlockSourceOpts struct { RpcClient *rpcclient.RpcClient // Primary RPC for block fetching (getBlock) SourceType BlockSourceType - StartSlot uint64 - EndSlot uint64 - BlockDir string + StartSlot uint64 + EndSlot uint64 + BlockDir string // Backup RPC endpoints for failover (optional) // These are tried in order if the primary fails with hard connectivity errors @@ -161,35 +160,35 @@ type BlockSourceStats struct { } type BlockSource struct { - rpcClients []*rpcclient.RpcClient // All RPC clients for block fetching (index 0 = primary) - streamChan chan *b.Block - startSlot uint64 - endSlot uint64 - currentSlot uint64 - blockDir string - sourceType BlockSourceType + rpcClients []*rpcclient.RpcClient // All RPC clients for block fetching (index 0 = primary) + streamChan chan *b.Block + startSlot uint64 + endSlot uint64 + currentSlot uint64 + blockDir string + sourceType BlockSourceType // RPC failover tracking - activeRpcIdx atomic.Int32 // Currently active RPC index (0 = primary) - slotsSinceFailover atomic.Uint64 // Slots emitted since failover (for retry timing) - failoverCount atomic.Uint64 // Total failovers (for stats) - hardErrCount atomic.Uint64 // Consecutive hard connectivity errors (reset on success) - lastHardErrTime atomic.Int64 // Unix timestamp of last hard error (for time-windowing) + activeRpcIdx atomic.Int32 // Currently active RPC index (0 = primary) + slotsSinceFailover atomic.Uint64 // Slots emitted since failover (for retry timing) + failoverCount atomic.Uint64 // Total failovers (for stats) + hardErrCount atomic.Uint64 // Consecutive hard connectivity errors (reset on success) + lastHardErrTime atomic.Int64 // Unix timestamp of last hard error (for time-windowing) // Rate limiting rateLimiter *rate.Limiter maxInflight int // Tip tracking - confirmedTip atomic.Uint64 - processedTip atomic.Uint64 // Processed commitment tip (super tip) - tipAtSlot atomic.Uint64 // What slot we had executed when tip was measured - lastExecutedSlot atomic.Uint64 // Last slot fully executed by replay (set by SetLastExecutedSlot) - tipSafetyMargin uint64 - tipPollInterval time.Duration - lastTipUpdate atomic.Int64 // Unix timestamp of last successful tip poll - tipPollFailures atomic.Uint64 // Consecutive tip poll failures - totalTipPollFails atomic.Uint64 // Total tip poll failures (for stats) + confirmedTip atomic.Uint64 + processedTip atomic.Uint64 // Processed commitment tip (super tip) + tipAtSlot atomic.Uint64 // What slot we had executed when tip was measured + lastExecutedSlot atomic.Uint64 // Last slot fully executed by replay (set by SetLastExecutedSlot) + tipSafetyMargin uint64 + tipPollInterval time.Duration + lastTipUpdate atomic.Int64 // Unix timestamp of last successful tip poll + tipPollFailures atomic.Uint64 // Consecutive tip poll failures + totalTipPollFails atomic.Uint64 // Total tip poll failures (for stats) // Reorder buffer reorderMu sync.Mutex @@ -219,11 +218,11 @@ type BlockSource struct { stallError atomic.Bool // Set when stall timeout triggers // Stall diagnostics - waitingSlotErrorsMu sync.Mutex - waitingSlotErrors map[uint64]*slotErrorInfo // Per-slot error tracking - lastStallHeartbeat atomic.Int64 // Unix timestamp of last stall heartbeat log - lastFailoverTime atomic.Int64 // Unix timestamp of last RPC failover - lastPriorityBlockedLog atomic.Int64 // Unix timestamp of last "priority slot blocked" log + waitingSlotErrorsMu sync.Mutex + waitingSlotErrors map[uint64]*slotErrorInfo // Per-slot error tracking + lastStallHeartbeat atomic.Int64 // Unix timestamp of last stall heartbeat log + lastFailoverTime atomic.Int64 // Unix timestamp of last RPC failover + lastPriorityBlockedLog atomic.Int64 // Unix timestamp of last "priority slot blocked" log // Near-tip mode tracking isNearTip atomic.Bool // True when close to confirmed tip @@ -333,23 +332,23 @@ func NewBlockSource(opts *BlockSourceOpts) *BlockSource { } bs := &BlockSource{ - rpcClients: rpcClients, - streamChan: make(chan *b.Block, streamChanBuffer), - startSlot: opts.StartSlot, - endSlot: opts.EndSlot, - currentSlot: opts.StartSlot, - blockDir: opts.BlockDir, - sourceType: opts.SourceType, - rateLimiter: rate.NewLimiter(rate.Limit(maxRPS), maxRPS), - maxInflight: maxInflight, - tipSafetyMargin: tipSafetyMargin, - tipPollInterval: time.Duration(tipPollMs) * time.Millisecond, - reorderBuffer: make(map[uint64]*b.Block), - skippedSlots: make(map[uint64]bool), - nextSlotToSend: opts.StartSlot, - maxPending: defaultMaxPending, - slotState: make(map[uint64]slotStatus), - inflightStart: make(map[uint64]time.Time), + rpcClients: rpcClients, + streamChan: make(chan *b.Block, streamChanBuffer), + startSlot: opts.StartSlot, + endSlot: opts.EndSlot, + currentSlot: opts.StartSlot, + blockDir: opts.BlockDir, + sourceType: opts.SourceType, + rateLimiter: rate.NewLimiter(rate.Limit(maxRPS), maxRPS), + maxInflight: maxInflight, + tipSafetyMargin: tipSafetyMargin, + tipPollInterval: time.Duration(tipPollMs) * time.Millisecond, + reorderBuffer: make(map[uint64]*b.Block), + skippedSlots: make(map[uint64]bool), + nextSlotToSend: opts.StartSlot, + maxPending: defaultMaxPending, + slotState: make(map[uint64]slotStatus), + inflightStart: make(map[uint64]time.Time), workQueue: make(chan uint64, maxInflight*2), resultQueue: make(chan fetchResult, maxInflight*2), stopChan: make(chan struct{}), @@ -653,7 +652,7 @@ func (bs *BlockSource) logStallDiagnostics(prefix string) { mlog.Log.Errorf("=== End %s ===", prefix) } -func (bs *BlockSource) tryGetBlockFromFile(slot uint64) (*block.Block, error) { +func (bs *BlockSource) tryGetBlockFromFile(slot uint64) (*b.Block, error) { if bs.blockDir == "" { return nil, fmt.Errorf("no block directory specified") } @@ -664,7 +663,7 @@ func (bs *BlockSource) tryGetBlockFromFile(slot uint64) (*block.Block, error) { } decoder := json.NewDecoder(file) - out := &block.Block{} + out := &b.Block{} err = decoder.Decode(out) if err != nil { @@ -785,7 +784,7 @@ func (bs *BlockSource) fetchBlockOnce(slot uint64, rpcIdx int32) (*b.Block, erro return nil, err } - return block.FromBlockResult(blockResult, slot, rpc), nil + return b.FromBlockResult(blockResult, slot, rpc), nil } // pollTip periodically updates the confirmed tip by querying all configured RPCs @@ -1193,13 +1192,13 @@ func (bs *BlockSource) emitOrderedBlocks() { if result.skipped { bs.stats.FetchSkipped.Add(1) bs.skippedSlots[result.slot] = true - bs.hardErrCount.Store(0) // Reset on progress + bs.hardErrCount.Store(0) // Reset on progress bs.clearSlotErrors(result.slot) // Clear stall diagnostics for this slot } else if result.block != nil { // Success! This takes priority over any pending error results. bs.stats.FetchSuccesses.Add(1) bs.reorderBuffer[result.slot] = result.block - bs.hardErrCount.Store(0) // Reset error count on success + bs.hardErrCount.Store(0) // Reset error count on success bs.clearSlotErrors(result.slot) // Clear stall diagnostics for this slot // Track max buffered slot if result.slot > bs.stats.MaxBufferedSlot.Load() { @@ -1671,7 +1670,7 @@ func (bs *BlockSource) fetchAndParseBlockSequential(slot uint64) (*b.Block, erro return nil, fmt.Errorf("error fetching block: %w", err) } } - blk = block.FromBlockResult(blockResult, slot, rpc) + blk = b.FromBlockResult(blockResult, slot, rpc) } } else if bs.sourceType == BlockSourceLightbringer { // NOTE: BlockSourceLightbringer is TEMPORARILY NON-FUNCTIONAL. diff --git a/pkg/blockstream/prefetcher.go b/pkg/blockstream/prefetcher.go new file mode 100644 index 00000000..727c1bd3 --- /dev/null +++ b/pkg/blockstream/prefetcher.go @@ -0,0 +1,57 @@ +package blockstream + +import ( + "context" + + "github.com/Overclock-Validator/mithril/pkg/accountsdb" + "github.com/Overclock-Validator/mithril/pkg/block" + "github.com/Overclock-Validator/mithril/pkg/mlog" +) + +type Prefetcher struct { + src *BlockSource + accountsDb *accountsdb.AccountsDb + out chan *block.Block +} + +func NewPrefetcher( + ctx context.Context, + b *BlockSource, + a *accountsdb.AccountsDb, +) *Prefetcher { + p := &Prefetcher{b, a, make(chan *block.Block)} + go p.prefetchWorker(ctx) + return p +} + +func (p *Prefetcher) NextBlock() *block.Block { + return <-p.out +} + +func (p *Prefetcher) prefetchWorker(ctx context.Context) { + defer close(p.out) + for { + b := p.src.NextBlock() + if b == nil { + return + } + select { + case <-ctx.Done(): + mlog.Log.Infof("prefetch worker exiting: %v", ctx.Err()) + return + case p.out <- b: // Caller was waiting on NextBlock so return it immediately. + continue + default: + } + + // TODO account for ALTs + p.accountsDb.Prefetch(b.UniqueAccounts()) + + select { + case <-ctx.Done(): + mlog.Log.Infof("prefetch worker exiting: %v", ctx.Err()) + return + case p.out <- b: + } + } +} diff --git a/pkg/replay/block.go b/pkg/replay/block.go index 93f64c22..482df55c 100644 --- a/pkg/replay/block.go +++ b/pkg/replay/block.go @@ -74,6 +74,8 @@ var commitSlot atomic.Uint64 // The slot currently being committed (for error me // CurrentRunID is a unique identifier for this replay session, used to correlate logs var CurrentRunID string +var EnablePrefetcher bool = false + // GenerateRunID creates a short random hex string for log correlation func GenerateRunID() string { b := make([]byte, 4) // 8 hex chars @@ -206,26 +208,43 @@ func resolveAddrTableLookups(accountsDb *accountsdb.AccountsDb, block *b.Block) continue } - var skipLookup bool for _, addrTableKey := range tx.Message.GetAddressTableLookups().GetTableIDs() { - if _, alreadyLoaded := tables[addrTableKey]; alreadyLoaded { - continue - } + tables[addrTableKey] = nil + } + } - acct, err := accountsDb.GetAccount(block.Slot, addrTableKey) - if err != nil { - skipLookup = true - break - } + tablesSlice := make([]solana.PublicKey, 0, len(tables)) + for t := range tables { + tablesSlice = append(tablesSlice, t) + } + accts, err := accountsDb.GetAccountsBatch(context.Background(), tablesSlice) + if err != nil { + return err + } - addrLookupTable, err := sealevel.UnmarshalAddressLookupTable(acct.Data) - if err != nil { - return err - } + for i := range tablesSlice { + if accts[i] == nil { + continue + } + addrLookupTable, err := sealevel.UnmarshalAddressLookupTable(accts[i].Data) + if err != nil { + return err + } + tables[tablesSlice[i]] = addrLookupTable.Addresses + } - tables[addrTableKey] = addrLookupTable.Addresses + for _, tx := range block.Transactions { + if !tx.Message.IsVersioned() || tx.Message.AddressTableLookups.NumLookups() == 0 { + continue } + skipLookup := false + for _, addrTableKey := range tx.Message.GetAddressTableLookups().GetTableIDs() { + if tables[addrTableKey] == nil { + skipLookup = true + break + } + } if skipLookup { continue } @@ -245,25 +264,7 @@ func resolveAddrTableLookups(accountsDb *accountsdb.AccountsDb, block *b.Block) } func extractAndDedupeBlockAccts(block *b.Block) []solana.PublicKey { - var numPubkeys int - for _, tx := range block.Transactions { - numPubkeys += len(tx.Message.AccountKeys) - } - - numPubkeys += len(block.UpdatedAccts) - - pubkeyMap := make(map[solana.PublicKey]struct{}, numPubkeys) - - for _, tx := range block.Transactions { - for _, pk := range tx.Message.AccountKeys { - pubkeyMap[pk] = struct{}{} - } - } - - for _, pk := range block.UpdatedAccts { - pubkeyMap[pk] = struct{}{} - } - + pubkeyMap := block.UniqueAccounts() pubkeys := make([]solana.PublicKey, len(pubkeyMap)) i := 0 for pk := range pubkeyMap { @@ -349,6 +350,7 @@ func cacheConstantSysvars(acctsDb *accountsdb.AccountsDb) { } func loadBlockAccountsAndUpdateSysvars(accountsDb *accountsdb.AccountsDb, block *b.Block) (accounts.Accounts, accounts.Accounts, error) { + start := time.Now() err := resolveAddrTableLookups(accountsDb, block) if err != nil { return nil, nil, err @@ -356,11 +358,13 @@ func loadBlockAccountsAndUpdateSysvars(accountsDb *accountsdb.AccountsDb, block dedupedAccts := extractAndDedupeBlockAccts(block) ctx := context.Background() - slotAccts, err := accountsDb.GetAccountsBatch(ctx, block.Slot, dedupedAccts) + slotAccts, err := accountsDb.GetAccountsBatch(ctx, dedupedAccts) if err != nil { return nil, nil, err } + mlog.Log.Infof("resolveAndLoadAccountsMs=%f", time.Since(start).Seconds()*1000) + numAccts := uint64(len(slotAccts)) accts := accounts.NewMemAccountsWithLen(numAccts) parentAccts := accounts.NewMemAccountsWithLen(numAccts) @@ -1251,6 +1255,11 @@ func ReplayBlocks( } } blockStream := blockstream.NewBlockSource(opts) + var prefetcher *blockstream.Prefetcher + mlog.Log.Infof("EnablePrefetcher=%t", EnablePrefetcher) + if EnablePrefetcher { + prefetcher = blockstream.NewPrefetcher(ctx, blockStream, acctsDb) + } if !isLive { blockStream.DownloadInitialBlocks() @@ -1291,7 +1300,12 @@ func ReplayBlocks( } waitStart := time.Now() - block := blockStream.NextBlock() + var block *b.Block + if EnablePrefetcher { + block = prefetcher.NextBlock() + } else { + block = blockStream.NextBlock() + } waitTime := time.Since(waitStart) // Stop stall monitor diff --git a/pkg/replay/transaction.go b/pkg/replay/transaction.go index 533bcfd5..1cb6e32e 100644 --- a/pkg/replay/transaction.go +++ b/pkg/replay/transaction.go @@ -299,7 +299,7 @@ func verifySignatures(tx *solana.Transaction, slot uint64, sigverifyWg *sync.Wai } func ProcessTransaction(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, tx *solana.Transaction, txMeta *rpc.TransactionMeta, dbgOpts *DebugOptions, arena *arena.Arena[sealevel.BorrowedAccount]) (*fees.TxFeeInfo, error) { - if slotCtx.TraceCtx != nil { + if trace.IsEnabled() && slotCtx.TraceCtx != nil { region := trace.StartRegion(slotCtx.TraceCtx, "ProcessTransaction") defer region.End()