Skip to content

Commit

Permalink
Caplin: Added more logs to --caplin.archive (#13074)
Browse files Browse the repository at this point in the history
Co-authored-by: Kewei <[email protected]>
  • Loading branch information
Giulio2002 and domiwei authored Dec 11, 2024
1 parent 98545dc commit f22317e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
8 changes: 7 additions & 1 deletion cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,13 @@ func (a *Antiquary) Loop() error {
}
}

a.stateSn.OpenFolder()
if a.stateSn != nil {
if err := a.stateSn.OpenFolder(); err != nil {
return err
}
}
log.Info("[Caplin] Stat", "blocks-static", a.sn.BlocksAvailable(), "states-static", a.stateSn.BlocksAvailable(), "blobs-static", a.sn.FrozenBlobs(),
"state-history-enabled", a.states, "block-history-enabled", a.blocks, "blob-history-enabled", a.blobs, "snapgen", a.snapgen)

frozenSlots := a.sn.BlocksAvailable()
if frozenSlots != 0 {
Expand Down
23 changes: 16 additions & 7 deletions cl/antiquary/state_antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ func (s *Antiquary) loopStates(ctx context.Context) {
// Execute this each second
reqRetryTimer := time.NewTicker(100 * time.Millisecond)
defer reqRetryTimer.Stop()

if !initial_state.IsGenesisStateSupported(clparams.NetworkType(s.cfg.DepositNetworkID)) {
s.logger.Warn("Genesis state is not supported for this network, no historical states data will be available")
return
}

_, beforeFinalized, err := s.readHistoricalProcessingProgress(ctx)
if err != nil {
s.logger.Error("Failed to read historical processing progress", "err", err)
Expand All @@ -81,14 +81,16 @@ func (s *Antiquary) loopStates(ctx context.Context) {
s.logger.Error("Failed to read historical processing progress", "err", err)
continue
}
if s.sn == nil || s.syncedData.Syncing() {
continue
}

// We wait for updated finality.
if finalized == beforeFinalized {
continue
}
beforeFinalized = finalized
if s.sn == nil || s.syncedData.Syncing() {
continue
}

if err := s.IncrementBeaconState(ctx, finalized); err != nil {
if s.currentState != nil {
s.logger.Warn("Could not to increment beacon state, trying again later", "err", err, "slot", s.currentState.Slot())
Expand Down Expand Up @@ -140,9 +142,16 @@ func FillStaticValidatorsTableIfNeeded(ctx context.Context, logger log.Logger, s
blocksAvaiable := stateSn.BlocksAvailable()
stateSnRoTx := stateSn.View()
defer stateSnRoTx.Close()

log.Info("[Caplin-Archive] filling validators table", "from", 0, "to", stateSn.BlocksAvailable())
logTicker := time.NewTicker(10 * time.Second)
defer logTicker.Stop()
start := time.Now()
for slot := uint64(0); slot <= stateSn.BlocksAvailable(); slot++ {
select {
case <-logTicker.C:
log.Info("[Caplin-Archive] Filled validators table", "progress", fmt.Sprintf("%d/%d", slot, stateSn.BlocksAvailable()))
default:
}
seg, ok := stateSnRoTx.VisibleSegment(slot, kv.StateEvents)
if !ok {
return false, fmt.Errorf("segment not found for slot %d", slot)
Expand Down Expand Up @@ -437,7 +446,7 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
// We now do some post-processing on the state.
select {
case <-progressTimer.C:
log.Log(logLvl, "State processing progress", "slot", slot, "blk/sec", fmt.Sprintf("%.2f", float64(slot-prevSlot)/60))
log.Log(logLvl, "[Caplin-Archive] Historical States reconstruction", "slot", slot, "blk/sec", fmt.Sprintf("%.2f", float64(slot-prevSlot)/60))
prevSlot = slot
default:
}
Expand Down Expand Up @@ -491,7 +500,7 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}

log.Info("Historical states antiquated", "slot", s.currentState.Slot(), "root", libcommon.Hash(stateRoot), "latency", endTime)
log.Info("[Caplin-Archive] Historical states antiquated", "slot", s.currentState.Slot(), "root", libcommon.Hash(stateRoot), "latency", endTime)
if s.stateSn != nil {
if err := s.stateSn.OpenFolder(); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cl/phase1/stages/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ func postForkchoiceOperations(ctx context.Context, tx kv.RwTx, logger log.Logger
if _, err = cfg.attestationDataProducer.ProduceAndCacheAttestationData(tx, headState, headRoot, headState.Slot(), 0); err != nil {
logger.Warn("failed to produce and cache attestation data", "err", err)
}
if err := beacon_indicies.WriteHighestFinalized(tx, cfg.forkChoice.FinalizedSlot()); err != nil {
return err
}
start := time.Now()
cfg.forkChoice.SetSynced(true) // Now we are synced
// Update the head state with the new head state
Expand Down

0 comments on commit f22317e

Please sign in to comment.