Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 25 additions & 61 deletions pkg/accountsdb/accountsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

type AccountsDb struct {
Index *pebble.DB
BankHashStore *pebble.DB
AcctsDir string
LargestFileId atomic.Uint64
VoteAcctCache otter.Cache[solana.PublicKey, *accounts.Account]
Expand Down Expand Up @@ -103,13 +102,7 @@ func OpenDb(accountsDbDir string) (*AccountsDb, error) {
return nil, fmt.Errorf("opening indexDir=%s: %w", indexDir, err)
}

bankhashDir := filepath.Join(accountsDbDir, "bankhash_db")
bankhashDb, err := pebble.Open(bankhashDir, &pebble.Options{Logger: silentLogger{}})
if err != nil {
return nil, fmt.Errorf("opening bankhashDir=%s: %w", bankhashDir, err)
}

accountsDb := &AccountsDb{Index: db, BankHashStore: bankhashDb, AcctsDir: appendVecsDir}
accountsDb := &AccountsDb{Index: db, AcctsDir: appendVecsDir}
accountsDb.LargestFileId.Store(largestFileId)

mlog.Log.Infof("StoreAsync=%t", StoreAsync)
Expand Down Expand Up @@ -145,10 +138,6 @@ func (accountsDb *AccountsDb) CloseDb() {
if err := accountsDb.Index.Close(); err != nil {
mlog.Log.Errorf("CloseDb: Index.Close() error: %v", err)
}
mlog.Log.Infof("CloseDb: syncing and closing BankHashStore...")
if err := accountsDb.BankHashStore.Close(); err != nil {
mlog.Log.Errorf("CloseDb: BankHashStore.Close() error: %v", err)
}
mlog.Log.Infof("CloseDb: done\n") // extra newline for spacing after close
}

Expand Down Expand Up @@ -225,17 +214,14 @@ func (accountsDb *AccountsDb) getStoredAccount(slot uint64, pubkey solana.Public
r.End()

defer trace.StartRegion(context.Background(), "GetStoredAccountDisk").End()
acctIdxEntryBytes, c, err := accountsDb.Index.Get(pubkey[:])
if err != nil {
acctIdxEntry, err := getAccountIndexEntry(accountsDb.Index, pubkey)
if errors.Is(err, pebble.ErrNotFound) {
//mlog.Log.Debugf("no account found in accountsdb for pubkey %s: %s", pubkey, err)
return nil, ErrNoAccount
}

acctIdxEntry, err := unmarshalAcctIdxEntry(acctIdxEntryBytes)
if err != nil {
panic("failed to unmarshal AccountIndexEntry from index kv database")
panic(err)
}
c.Close()

appendVecFileName := fmt.Sprintf("%s/%d.%d", accountsDb.AcctsDir, acctIdxEntry.Slot, acctIdxEntry.FileId)

Expand Down Expand Up @@ -385,34 +371,19 @@ func (accountsDb *AccountsDb) storeAccountsInternal(accts []*accounts.Account, s
defer appendVecFile.Close()

appendVecAcctsBuf := new(bytes.Buffer)
writer := new(bytes.Buffer)
var acctIdxEntryBuf [24]byte

for _, acct := range accts {
if acct == nil {
continue
}

// create index entry, encode it and write it to the index kv store
// offset field is specified as the current num of bytes written to the appendvec buffer.
writer.Reset()

indexEntry := AccountIndexEntry{Slot: slot, FileId: fileId, Offset: uint64(appendVecAcctsBuf.Len())}
indexEntry.Marshal(&acctIdxEntryBuf)

// if an entry already existed in the index for this account, very often we can simply make the state update
// in-place, i.e. into the account's existing appendvec blob.
// we can make the account state update in-place iff the existing version's data length is the same as the
// new version's data length, which is the case about 98% of the time.
// if not, then we write out a new appendvec.
existingacctIdxEntryBuf, c, err := accountsDb.Index.Get(acct.Key[:])
acctIdxEntry, err := getAccountIndexEntry(accountsDb.Index, acct.Key)
if err == nil {
acctIdxEntry, err := unmarshalAcctIdxEntry(existingacctIdxEntryBuf)
if err != nil {
panic("failed to unmarshal AccountIndexEntry from index kv database")
}
c.Close()

existingAppendVecFileName := fmt.Sprintf("%s/%d.%d", accountsDb.AcctsDir, acctIdxEntry.Slot, acctIdxEntry.FileId)
existingAppendVecFile, err := os.OpenFile(existingAppendVecFileName, os.O_RDWR, 0666)
if err != nil {
Expand Down Expand Up @@ -448,9 +419,10 @@ func (accountsDb *AccountsDb) storeAccountsInternal(accts []*accounts.Account, s
}
}

err = accountsDb.Index.Set(acct.Key[:], acctIdxEntryBuf[:], &pebble.WriteOptions{})
indexEntry := AccountIndexEntry{Slot: slot, FileId: fileId, Offset: uint64(appendVecAcctsBuf.Len())}
err = setAccountIndexEntry(accountsDb.Index, acct.Key, indexEntry)
if err != nil {
panic(fmt.Sprintf("unable to add acct for %s to acctsdb: %v", acct.Key, err))
panic(fmt.Sprintf("unable to add acct for %s to acctsdb: %v", acct.Key.String(), err))
}

// marshal up the account as an appendvec style account and write it to the buffer
Expand Down Expand Up @@ -501,19 +473,14 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc
return ctx.Err()
}
err := func(a *accounts.Account) error {
existingacctIdxEntryBuf, c, err := accountsDb.Index.Get(a.Key[:])
existingIdxEntry, err := getAccountIndexEntry(accountsDb.Index, a.Key)
if errors.Is(err, pebble.ErrNotFound) {
lengthChangedAccounts <- a
return nil
}
if err != nil {
return fmt.Errorf("reading from index: %w", err)
}
existingIdxEntry, err := unmarshalAcctIdxEntry(existingacctIdxEntryBuf)
c.Close()
if err != nil {
return fmt.Errorf("unmarshaling index entry: %w", err)
}

existingAppendVecFileName := fmt.Sprintf("%s/%d.%d", accountsDb.AcctsDir, existingIdxEntry.Slot, existingIdxEntry.FileId)
existingAppendVecFile, err := os.OpenFile(existingAppendVecFileName, os.O_RDWR, 0666)
Expand Down Expand Up @@ -559,7 +526,18 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc
})
}
newAppendVecGroup := errgroup.Group{}
newAppendVecGroup.Go(func() error {
newAppendVecGroup.Go(func() (err error) {
b := accountsDb.Index.NewBatch()
defer func() {
if b.Empty() {
return
}
e := b.Commit(&pebble.WriteOptions{})
if e != nil {
err = errors.Join(err, e)
}
}()

fileId := accountsDb.LargestFileId.Add(1)
appendVecFileName := fmt.Sprintf("%s/%d.%d", accountsDb.AcctsDir, slot, fileId)
appendVecFile, err := os.OpenFile(appendVecFileName, os.O_RDWR|os.O_CREATE, 0666)
Expand All @@ -571,12 +549,10 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc
defer appendVecWriter.Flush()

appendVecFileOffset := uint64(0)
var acctIdxEntryBuf [24]byte

for acct := range lengthChangedAccounts {
indexEntry := AccountIndexEntry{Slot: slot, FileId: fileId, Offset: appendVecFileOffset}
indexEntry.Marshal(&acctIdxEntryBuf)
err = accountsDb.Index.Set(acct.Key[:], acctIdxEntryBuf[:], &pebble.WriteOptions{})
err = setAccountIndexEntry(b, acct.Key, indexEntry)
if err != nil {
return fmt.Errorf("unable to add acct for %s to acctsdb: %v", acct.Key, err)
}
Expand All @@ -596,7 +572,6 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc
}
appendVecFileOffset += uint64(l)
}

return nil
})

Expand All @@ -609,22 +584,11 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc
}

func (accountsDb *AccountsDb) GetBankHashForSlot(slot uint64) ([]byte, error) {
var slotBytes [8]byte
binary.LittleEndian.PutUint64(slotBytes[:], slot)
bh, c, err := accountsDb.BankHashStore.Get(slotBytes[:])
if err != nil {
return nil, fmt.Errorf("GetBankHashForSlot slot=%d: %w", slot, err)
}
out := make([]byte, len(bh))
copy(out, bh)
c.Close()
return out, nil
return getBankhashForSlot(accountsDb.Index, slot)
}

func (accountsDb *AccountsDb) StoreBankHashForSlot(slot uint64, bankHash []byte) error {
var slotBytes [8]byte
binary.LittleEndian.PutUint64(slotBytes[:], slot)
return accountsDb.BankHashStore.Set(slotBytes[:], bankHash, &pebble.WriteOptions{})
func (accountsDb *AccountsDb) StoreBankHashForSlot(slot uint64, bh []byte) error {
return setBankhashForSlot(accountsDb.Index, slot, bh)
}

func (accountsDb *AccountsDb) KeysBetweenPrefixes(startPrefix uint64, endPrefix uint64) []solana.PublicKey {
Expand Down
63 changes: 63 additions & 0 deletions pkg/accountsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@ import (
"fmt"

"github.com/Overclock-Validator/mithril/pkg/addresses"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
"github.com/gagliardetto/solana-go"
)

// Prefixes to separate keyspaces within the same pebble.DB
var (
accountPrefix = []byte{0x00}
slotBankhashPrefix = []byte{0x01}
)

type AccountIndexEntry struct {
Slot uint64
FileId uint64
Expand Down Expand Up @@ -66,3 +74,58 @@ func BuildIndexEntriesFromAppendVecs(data []byte, fileSize uint64, slot uint64,

return pubkeys, acctIdxEntries, stakePubkeys, nil
}

func getAccountIndexEntry(index *pebble.DB, k solana.PublicKey) (*AccountIndexEntry, error) {
var prefixedK [33]byte
copy(prefixedK[:len(accountPrefix)], accountPrefix)
copy(prefixedK[len(accountPrefix):], k[:])
v, c, err := index.Get(prefixedK[:])
if err != nil {
return nil, fmt.Errorf("getting account index entry for k=%s: %w", k, err)
}
defer c.Close()
a, err := unmarshalAcctIdxEntry(v)
if err != nil {
return nil, fmt.Errorf("parsing account index entry for k=%s v=%x: %w", k, v, err)
}
return a, nil
}

func setAccountIndexEntry(index pebble.Writer, k solana.PublicKey, a AccountIndexEntry) error {
var prefixedK [33]byte
copy(prefixedK[:len(accountPrefix)], accountPrefix)
copy(prefixedK[len(accountPrefix):], k[:])
var buf [24]byte
a.Marshal(&buf)
return index.Set(prefixedK[:], buf[:], &pebble.WriteOptions{})
}

func SSTSetAccountIndexEntry(sst *sstable.Writer, k solana.PublicKey, a AccountIndexEntry) error {
var prefixedK [33]byte
copy(prefixedK[:len(accountPrefix)], accountPrefix)
copy(prefixedK[len(accountPrefix):], k[:])
var buf [24]byte
a.Marshal(&buf)
return sst.Set(prefixedK[:], buf[:])
}

func getBankhashForSlot(index *pebble.DB, slot uint64) ([]byte, error) {
var prefixedSlot [9]byte
copy(prefixedSlot[:len(slotBankhashPrefix)], slotBankhashPrefix)
binary.LittleEndian.PutUint64(prefixedSlot[len(slotBankhashPrefix):], slot)
bh, c, err := index.Get(prefixedSlot[:])
if err != nil {
return nil, fmt.Errorf("GetBankHashForSlot slot=%d: %w", slot, err)
}
out := make([]byte, len(bh))
copy(out, bh)
c.Close()
return out, nil
}

func setBankhashForSlot(index pebble.Writer, slot uint64, bh []byte) error {
var prefixedSlot [9]byte
copy(prefixedSlot[:len(slotBankhashPrefix)], slotBankhashPrefix)
binary.LittleEndian.PutUint64(prefixedSlot[len(slotBankhashPrefix):], slot)
return index.Set(prefixedSlot[:], bh, &pebble.WriteOptions{})
}
4 changes: 1 addition & 3 deletions pkg/snapshot/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ func (s *shard) logToSST(ctx context.Context) error {
return 1
}
})
var vBytes [vlen]byte
// Write to SST
sstFilename := fmt.Sprintf("%s.sst", filename)
sstFile, err := vfs.Default.Create(sstFilename)
Expand All @@ -243,8 +242,7 @@ func (s *shard) logToSST(ctx context.Context) error {
if lastWritten >= 0 && bytes.Equal(kv.k[:], pairs[lastWritten].k[:]) {
continue
}
kv.v.Marshal(&vBytes)
if err := w.Set(kv.k[:], vBytes[:]); err != nil {
if err := accountsdb.SSTSetAccountIndexEntry(w, kv.k, kv.v); err != nil {
return fmt.Errorf("writing to SST: %w", err)
}
lastWritten = i
Expand Down