Skip to content

Commit

Permalink
Add trackers for missing block metadata retroactively
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Feb 14, 2025
1 parent 13b9b8b commit 3cdab05
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 6 deletions.
76 changes: 70 additions & 6 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,17 +1207,16 @@ func (s *TransactionStreamer) checkResult(pos arbutil.MessageIndex, msgResult *e
"actual", msgResult.BlockHash,
)
// Try deleting the existing blockMetadata for this block in arbDB and set it as missing
if msgAndBlockInfo.BlockMetadata != nil {
if msgAndBlockInfo.BlockMetadata != nil &&
s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom {
batch := s.db.NewBatch()
if err := batch.Delete(dbKey(blockMetadataInputFeedPrefix, uint64(pos))); err != nil {
log.Error("error deleting blockMetadata of block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err)
return
}
if s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom {
if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos)), nil); err != nil {
log.Error("error marking deleted blockMetadata as missing in arbDB for a block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err)
return
}
if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos)), nil); err != nil {
log.Error("error marking deleted blockMetadata as missing in arbDB for a block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err)
return
}
if err := batch.Write(); err != nil {
log.Error("error writing batch that deletes blockMetadata of the block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err)
Expand Down Expand Up @@ -1320,7 +1319,72 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc
return s.config().ExecuteMessageLoopDelay
}

func (s *TransactionStreamer) backfillTrackersForMissingBlockMetadata(ctx context.Context) {
if s.trackBlockMetadataFrom == 0 {
return
}
msgCount, err := s.GetMessageCount()
if err != nil {
log.Error("Error getting message count from arbDB", "err", err)
return
}
if s.trackBlockMetadataFrom >= msgCount {
return // We dont need to back fill if trackBlockMetadataFrom is in the future
}

wasKeyFound := func(pos uint64) bool {
searchWithPrefix := func(prefix []byte) bool {
key := dbKey(prefix, pos)
_, err := s.db.Get(key)
if err == nil {
return true
}
if !dbutil.IsErrNotFound(err) {
log.Error("Error reading key in arbDB while back-filling trackers for missing blockMetadata", "key", key, "err", err)
}
return false
}
return searchWithPrefix(blockMetadataInputFeedPrefix) || searchWithPrefix(missingBlockMetadataInputFeedPrefix)
}

start := s.trackBlockMetadataFrom
if wasKeyFound(uint64(start)) {
return // back-filling not required
}
finish := msgCount - 1
for start < finish {
mid := (start + finish + 1) / 2
if wasKeyFound(uint64(mid)) {
finish = mid - 1
} else {
start = mid
}
}
lastNonExistent := start

// We back-fill in reverse to avoid fragmentation in case of any failures
batch := s.db.NewBatch()
for i := lastNonExistent; i >= s.trackBlockMetadataFrom; i-- {
if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(i)), nil); err != nil {
log.Error("Error marking blockMetadata as missing while back-filling", "pos", i, "err", err)
return
}
// If we reached the ideal batch size, commit and reset
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
log.Error("Error writing batch with missing trackers to db while back-filling", "err", err)
return
}
batch.Reset()
}
}
if err := batch.Write(); err != nil {
log.Error("Error writing batch with missing trackers to db while back-filling", "err", err)
}
}

func (s *TransactionStreamer) Start(ctxIn context.Context) error {
s.StopWaiter.Start(ctxIn, s)
s.LaunchThread(s.backfillTrackersForMissingBlockMetadata)
return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier)
}
67 changes: 67 additions & 0 deletions arbnode/tx_streamer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package arbnode

import (
"bytes"
"context"
"encoding/binary"
"testing"

"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbutil"
)

func TestTimeboostBackfillingsTrackersForMissingBlockMetadata(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

messageCount := uint64(20)

// Create arbDB with fragmented blockMetadata across blocks
arbDb := rawdb.NewMemoryDatabase()
countBytes, err := rlp.EncodeToBytes(messageCount)
Require(t, err)
Require(t, arbDb.Put(messageCountKey, countBytes))
addKeys := func(start, end uint64, prefix []byte) {
for i := start; i <= end; i++ {
Require(t, arbDb.Put(dbKey(prefix, i), []byte{}))
}
}
// 12, 13, 14, 18 have block metadata
addKeys(12, 14, blockMetadataInputFeedPrefix)
addKeys(18, 18, blockMetadataInputFeedPrefix)
// 15, 16, 17, 19 are missing
addKeys(15, 17, missingBlockMetadataInputFeedPrefix)
addKeys(19, 19, missingBlockMetadataInputFeedPrefix)

// Create tx streamer
txStreamer := &TransactionStreamer{db: arbDb}
txStreamer.StopWaiter.Start(ctx, txStreamer)

backfillAndVerifyCorrectness := func(trackBlockMetadataFrom arbutil.MessageIndex, missingTrackers []uint64) {
txStreamer.trackBlockMetadataFrom = trackBlockMetadataFrom
txStreamer.backfillTrackersForMissingBlockMetadata(ctx)
iter := arbDb.NewIterator([]byte("x"), nil)
pos := 0
for iter.Next() {
keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix)
if binary.BigEndian.Uint64(keyBytes) != missingTrackers[pos] {
t.Fatalf("unexpected presence of blockMetadata. msgSeqNum: %d, expectedMsgSeqNum: %d", binary.BigEndian.Uint64(keyBytes), missingTrackers[pos])
}
pos++
}
if pos != len(missingTrackers) {
t.Fatalf("number of keys with blockMetadataInputFeedPrefix doesn't match expected value. Want: %d, Got: %d", len(missingTrackers), pos)
}
iter.Release()
}

// Backfill trackers for missing data and verify that 10, 11 get added to already existing 16, 17, 18, 19 keys
backfillAndVerifyCorrectness(10, []uint64{10, 11, 15, 16, 17, 19})

// Backfill trackers for missing data and verify that 5, 6, 7, 8, 9 get added to already existing 10, 11, 16, 17, 18, 19 keys
backfillAndVerifyCorrectness(5, []uint64{5, 6, 7, 8, 9, 10, 11, 15, 16, 17, 19})
}

0 comments on commit 3cdab05

Please sign in to comment.