Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trackers for missing block metadata retroactively #2945

Merged
merged 7 commits into from
Feb 14, 2025
76 changes: 70 additions & 6 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,17 +1207,16 @@ func (s *TransactionStreamer) checkResult(pos arbutil.MessageIndex, msgResult *e
"actual", msgResult.BlockHash,
)
// Try deleting the existing blockMetadata for this block in arbDB and set it as missing
if msgAndBlockInfo.BlockMetadata != nil {
if msgAndBlockInfo.BlockMetadata != nil &&
s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom {
batch := s.db.NewBatch()
if err := batch.Delete(dbKey(blockMetadataInputFeedPrefix, uint64(pos))); err != nil {
log.Error("error deleting blockMetadata of block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err)
return
}
if s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom {
if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos)), nil); err != nil {
log.Error("error marking deleted blockMetadata as missing in arbDB for a block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err)
return
}
if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos)), nil); err != nil {
log.Error("error marking deleted blockMetadata as missing in arbDB for a block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err)
return
}
if err := batch.Write(); err != nil {
log.Error("error writing batch that deletes blockMetadata of the block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err)
Expand Down Expand Up @@ -1320,7 +1319,72 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc
return s.config().ExecuteMessageLoopDelay
}

func (s *TransactionStreamer) backfillTrackersForMissingBlockMetadata(ctx context.Context) {
if s.trackBlockMetadataFrom == 0 {
return
}
msgCount, err := s.GetMessageCount()
if err != nil {
log.Error("Error getting message count from arbDB", "err", err)
return
}
if s.trackBlockMetadataFrom >= msgCount {
return // We dont need to back fill if trackBlockMetadataFrom is in the future
}

wasKeyFound := func(pos uint64) bool {
searchWithPrefix := func(prefix []byte) bool {
key := dbKey(prefix, pos)
_, err := s.db.Get(key)
if err == nil {
return true
}
if !dbutil.IsErrNotFound(err) {
log.Error("Error reading key in arbDB while back-filling trackers for missing blockMetadata", "key", key, "err", err)
}
return false
}
return searchWithPrefix(blockMetadataInputFeedPrefix) || searchWithPrefix(missingBlockMetadataInputFeedPrefix)
}

start := s.trackBlockMetadataFrom
if wasKeyFound(uint64(start)) {
return // back-filling not required
}
finish := msgCount - 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start by checking if there is a value for the first tracked block. This will usually succeed then we can be done with just one database access.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh I had this check after the binary search (since its a required check), will just move this before search

for start < finish {
mid := (start + finish + 1) / 2
if wasKeyFound(uint64(mid)) {
finish = mid - 1
} else {
start = mid
}
}
lastNonExistent := start

// We back-fill in reverse to avoid fragmentation in case of any failures
batch := s.db.NewBatch()
for i := lastNonExistent; i >= s.trackBlockMetadataFrom; i-- {
if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(i)), nil); err != nil {
log.Error("Error marking blockMetadata as missing while back-filling", "pos", i, "err", err)
return
}
// If we reached the ideal batch size, commit and reset
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
log.Error("Error writing batch with missing trackers to db while back-filling", "err", err)
return
}
batch.Reset()
}
}
if err := batch.Write(); err != nil {
log.Error("Error writing batch with missing trackers to db while back-filling", "err", err)
}
}

func (s *TransactionStreamer) Start(ctxIn context.Context) error {
s.StopWaiter.Start(ctxIn, s)
s.LaunchThread(s.backfillTrackersForMissingBlockMetadata)
return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier)
}
67 changes: 67 additions & 0 deletions arbnode/tx_streamer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package arbnode

import (
"bytes"
"context"
"encoding/binary"
"testing"

"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbutil"
)

func TestTimeboostBackfillingsTrackersForMissingBlockMetadata(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

messageCount := uint64(20)

// Create arbDB with fragmented blockMetadata across blocks
arbDb := rawdb.NewMemoryDatabase()
countBytes, err := rlp.EncodeToBytes(messageCount)
Require(t, err)
Require(t, arbDb.Put(messageCountKey, countBytes))
addKeys := func(start, end uint64, prefix []byte) {
for i := start; i <= end; i++ {
Require(t, arbDb.Put(dbKey(prefix, i), []byte{}))
}
}
// 12, 13, 14, 18 have block metadata
addKeys(12, 14, blockMetadataInputFeedPrefix)
addKeys(18, 18, blockMetadataInputFeedPrefix)
// 15, 16, 17, 19 are missing
addKeys(15, 17, missingBlockMetadataInputFeedPrefix)
addKeys(19, 19, missingBlockMetadataInputFeedPrefix)

// Create tx streamer
txStreamer := &TransactionStreamer{db: arbDb}
txStreamer.StopWaiter.Start(ctx, txStreamer)

backfillAndVerifyCorrectness := func(trackBlockMetadataFrom arbutil.MessageIndex, missingTrackers []uint64) {
txStreamer.trackBlockMetadataFrom = trackBlockMetadataFrom
txStreamer.backfillTrackersForMissingBlockMetadata(ctx)
iter := arbDb.NewIterator([]byte("x"), nil)
pos := 0
for iter.Next() {
keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix)
if binary.BigEndian.Uint64(keyBytes) != missingTrackers[pos] {
t.Fatalf("unexpected presence of blockMetadata. msgSeqNum: %d, expectedMsgSeqNum: %d", binary.BigEndian.Uint64(keyBytes), missingTrackers[pos])
}
pos++
}
if pos != len(missingTrackers) {
t.Fatalf("number of keys with blockMetadataInputFeedPrefix doesn't match expected value. Want: %d, Got: %d", len(missingTrackers), pos)
}
iter.Release()
}

// Backfill trackers for missing data and verify that 10, 11 get added to already existing 16, 17, 18, 19 keys
backfillAndVerifyCorrectness(10, []uint64{10, 11, 15, 16, 17, 19})

// Backfill trackers for missing data and verify that 5, 6, 7, 8, 9 get added to already existing 10, 11, 16, 17, 18, 19 keys
backfillAndVerifyCorrectness(5, []uint64{5, 6, 7, 8, 9, 10, 11, 15, 16, 17, 19})
}
Loading