diff --git a/pkg/accountsdb/accountsdb.go b/pkg/accountsdb/accountsdb.go index 24deb284..0f9bb79c 100644 --- a/pkg/accountsdb/accountsdb.go +++ b/pkg/accountsdb/accountsdb.go @@ -27,7 +27,6 @@ import ( type AccountsDb struct { Index *pebble.DB - BankHashStore *pebble.DB AcctsDir string LargestFileId atomic.Uint64 VoteAcctCache otter.Cache[solana.PublicKey, *accounts.Account] @@ -103,13 +102,7 @@ func OpenDb(accountsDbDir string) (*AccountsDb, error) { return nil, fmt.Errorf("opening indexDir=%s: %w", indexDir, err) } - bankhashDir := filepath.Join(accountsDbDir, "bankhash_db") - bankhashDb, err := pebble.Open(bankhashDir, &pebble.Options{Logger: silentLogger{}}) - if err != nil { - return nil, fmt.Errorf("opening bankhashDir=%s: %w", bankhashDir, err) - } - - accountsDb := &AccountsDb{Index: db, BankHashStore: bankhashDb, AcctsDir: appendVecsDir} + accountsDb := &AccountsDb{Index: db, AcctsDir: appendVecsDir} accountsDb.LargestFileId.Store(largestFileId) mlog.Log.Infof("StoreAsync=%t", StoreAsync) @@ -145,10 +138,6 @@ func (accountsDb *AccountsDb) CloseDb() { if err := accountsDb.Index.Close(); err != nil { mlog.Log.Errorf("CloseDb: Index.Close() error: %v", err) } - mlog.Log.Infof("CloseDb: syncing and closing BankHashStore...") - if err := accountsDb.BankHashStore.Close(); err != nil { - mlog.Log.Errorf("CloseDb: BankHashStore.Close() error: %v", err) - } mlog.Log.Infof("CloseDb: done\n") // extra newline for spacing after close } @@ -225,17 +214,14 @@ func (accountsDb *AccountsDb) getStoredAccount(slot uint64, pubkey solana.Public r.End() defer trace.StartRegion(context.Background(), "GetStoredAccountDisk").End() - acctIdxEntryBytes, c, err := accountsDb.Index.Get(pubkey[:]) - if err != nil { + acctIdxEntry, err := getAccountIndexEntry(accountsDb.Index, pubkey) + if errors.Is(err, pebble.ErrNotFound) { //mlog.Log.Debugf("no account found in accountsdb for pubkey %s: %s", pubkey, err) return nil, ErrNoAccount } - - acctIdxEntry, err := unmarshalAcctIdxEntry(acctIdxEntryBytes) if err != nil { - panic("failed to unmarshal AccountIndexEntry from index kv database") + panic(err) } - c.Close() appendVecFileName := fmt.Sprintf("%s/%d.%d", accountsDb.AcctsDir, acctIdxEntry.Slot, acctIdxEntry.FileId) @@ -385,34 +371,19 @@ func (accountsDb *AccountsDb) storeAccountsInternal(accts []*accounts.Account, s defer appendVecFile.Close() appendVecAcctsBuf := new(bytes.Buffer) - writer := new(bytes.Buffer) - var acctIdxEntryBuf [24]byte for _, acct := range accts { if acct == nil { continue } - // create index entry, encode it and write it to the index kv store - // offset field is specified as the current num of bytes written to the appendvec buffer. - writer.Reset() - - indexEntry := AccountIndexEntry{Slot: slot, FileId: fileId, Offset: uint64(appendVecAcctsBuf.Len())} - indexEntry.Marshal(&acctIdxEntryBuf) - // if an entry already existed in the index for this account, very often we can simply make the state update // in-place, i.e. into the account's existing appendvec blob. // we can make the account state update in-place iff the existing version's data length is the same as the // new version's data length, which is the case about 98% of the time. // if not, then we write out a new appendvec. - existingacctIdxEntryBuf, c, err := accountsDb.Index.Get(acct.Key[:]) + acctIdxEntry, err := getAccountIndexEntry(accountsDb.Index, acct.Key) if err == nil { - acctIdxEntry, err := unmarshalAcctIdxEntry(existingacctIdxEntryBuf) - if err != nil { - panic("failed to unmarshal AccountIndexEntry from index kv database") - } - c.Close() - existingAppendVecFileName := fmt.Sprintf("%s/%d.%d", accountsDb.AcctsDir, acctIdxEntry.Slot, acctIdxEntry.FileId) existingAppendVecFile, err := os.OpenFile(existingAppendVecFileName, os.O_RDWR, 0666) if err != nil { @@ -448,9 +419,10 @@ func (accountsDb *AccountsDb) storeAccountsInternal(accts []*accounts.Account, s } } - err = accountsDb.Index.Set(acct.Key[:], acctIdxEntryBuf[:], &pebble.WriteOptions{}) + indexEntry := AccountIndexEntry{Slot: slot, FileId: fileId, Offset: uint64(appendVecAcctsBuf.Len())} + err = setAccountIndexEntry(accountsDb.Index, acct.Key, indexEntry) if err != nil { - panic(fmt.Sprintf("unable to add acct for %s to acctsdb: %v", acct.Key, err)) + panic(fmt.Sprintf("unable to add acct for %s to acctsdb: %v", acct.Key.String(), err)) } // marshal up the account as an appendvec style account and write it to the buffer @@ -501,7 +473,7 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc return ctx.Err() } err := func(a *accounts.Account) error { - existingacctIdxEntryBuf, c, err := accountsDb.Index.Get(a.Key[:]) + existingIdxEntry, err := getAccountIndexEntry(accountsDb.Index, a.Key) if errors.Is(err, pebble.ErrNotFound) { lengthChangedAccounts <- a return nil @@ -509,11 +481,6 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc if err != nil { return fmt.Errorf("reading from index: %w", err) } - existingIdxEntry, err := unmarshalAcctIdxEntry(existingacctIdxEntryBuf) - c.Close() - if err != nil { - return fmt.Errorf("unmarshaling index entry: %w", err) - } existingAppendVecFileName := fmt.Sprintf("%s/%d.%d", accountsDb.AcctsDir, existingIdxEntry.Slot, existingIdxEntry.FileId) existingAppendVecFile, err := os.OpenFile(existingAppendVecFileName, os.O_RDWR, 0666) @@ -559,7 +526,18 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc }) } newAppendVecGroup := errgroup.Group{} - newAppendVecGroup.Go(func() error { + newAppendVecGroup.Go(func() (err error) { + b := accountsDb.Index.NewBatch() + defer func() { + if b.Empty() { + return + } + e := b.Commit(&pebble.WriteOptions{}) + if e != nil { + err = errors.Join(err, e) + } + }() + fileId := accountsDb.LargestFileId.Add(1) appendVecFileName := fmt.Sprintf("%s/%d.%d", accountsDb.AcctsDir, slot, fileId) appendVecFile, err := os.OpenFile(appendVecFileName, os.O_RDWR|os.O_CREATE, 0666) @@ -571,12 +549,10 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc defer appendVecWriter.Flush() appendVecFileOffset := uint64(0) - var acctIdxEntryBuf [24]byte for acct := range lengthChangedAccounts { indexEntry := AccountIndexEntry{Slot: slot, FileId: fileId, Offset: appendVecFileOffset} - indexEntry.Marshal(&acctIdxEntryBuf) - err = accountsDb.Index.Set(acct.Key[:], acctIdxEntryBuf[:], &pebble.WriteOptions{}) + err = setAccountIndexEntry(b, acct.Key, indexEntry) if err != nil { return fmt.Errorf("unable to add acct for %s to acctsdb: %v", acct.Key, err) } @@ -596,7 +572,6 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc } appendVecFileOffset += uint64(l) } - return nil }) @@ -609,22 +584,11 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc } func (accountsDb *AccountsDb) GetBankHashForSlot(slot uint64) ([]byte, error) { - var slotBytes [8]byte - binary.LittleEndian.PutUint64(slotBytes[:], slot) - bh, c, err := accountsDb.BankHashStore.Get(slotBytes[:]) - if err != nil { - return nil, fmt.Errorf("GetBankHashForSlot slot=%d: %w", slot, err) - } - out := make([]byte, len(bh)) - copy(out, bh) - c.Close() - return out, nil + return getBankhashForSlot(accountsDb.Index, slot) } -func (accountsDb *AccountsDb) StoreBankHashForSlot(slot uint64, bankHash []byte) error { - var slotBytes [8]byte - binary.LittleEndian.PutUint64(slotBytes[:], slot) - return accountsDb.BankHashStore.Set(slotBytes[:], bankHash, &pebble.WriteOptions{}) +func (accountsDb *AccountsDb) StoreBankHashForSlot(slot uint64, bh []byte) error { + return setBankhashForSlot(accountsDb.Index, slot, bh) } func (accountsDb *AccountsDb) KeysBetweenPrefixes(startPrefix uint64, endPrefix uint64) []solana.PublicKey { diff --git a/pkg/accountsdb/index.go b/pkg/accountsdb/index.go index 1d670b75..57a8d6c6 100644 --- a/pkg/accountsdb/index.go +++ b/pkg/accountsdb/index.go @@ -6,9 +6,17 @@ import ( "fmt" "github.com/Overclock-Validator/mithril/pkg/addresses" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/sstable" "github.com/gagliardetto/solana-go" ) +// Prefixes to separate keyspaces within the same pebble.DB +var ( + accountPrefix = []byte{0x00} + slotBankhashPrefix = []byte{0x01} +) + type AccountIndexEntry struct { Slot uint64 FileId uint64 @@ -66,3 +74,58 @@ func BuildIndexEntriesFromAppendVecs(data []byte, fileSize uint64, slot uint64, return pubkeys, acctIdxEntries, stakePubkeys, nil } + +func getAccountIndexEntry(index *pebble.DB, k solana.PublicKey) (*AccountIndexEntry, error) { + var prefixedK [33]byte + copy(prefixedK[:len(accountPrefix)], accountPrefix) + copy(prefixedK[len(accountPrefix):], k[:]) + v, c, err := index.Get(prefixedK[:]) + if err != nil { + return nil, fmt.Errorf("getting account index entry for k=%s: %w", k, err) + } + defer c.Close() + a, err := unmarshalAcctIdxEntry(v) + if err != nil { + return nil, fmt.Errorf("parsing account index entry for k=%s v=%x: %w", k, v, err) + } + return a, nil +} + +func setAccountIndexEntry(index pebble.Writer, k solana.PublicKey, a AccountIndexEntry) error { + var prefixedK [33]byte + copy(prefixedK[:len(accountPrefix)], accountPrefix) + copy(prefixedK[len(accountPrefix):], k[:]) + var buf [24]byte + a.Marshal(&buf) + return index.Set(prefixedK[:], buf[:], &pebble.WriteOptions{}) +} + +func SSTSetAccountIndexEntry(sst *sstable.Writer, k solana.PublicKey, a AccountIndexEntry) error { + var prefixedK [33]byte + copy(prefixedK[:len(accountPrefix)], accountPrefix) + copy(prefixedK[len(accountPrefix):], k[:]) + var buf [24]byte + a.Marshal(&buf) + return sst.Set(prefixedK[:], buf[:]) +} + +func getBankhashForSlot(index *pebble.DB, slot uint64) ([]byte, error) { + var prefixedSlot [9]byte + copy(prefixedSlot[:len(slotBankhashPrefix)], slotBankhashPrefix) + binary.LittleEndian.PutUint64(prefixedSlot[len(slotBankhashPrefix):], slot) + bh, c, err := index.Get(prefixedSlot[:]) + if err != nil { + return nil, fmt.Errorf("GetBankHashForSlot slot=%d: %w", slot, err) + } + out := make([]byte, len(bh)) + copy(out, bh) + c.Close() + return out, nil +} + +func setBankhashForSlot(index pebble.Writer, slot uint64, bh []byte) error { + var prefixedSlot [9]byte + copy(prefixedSlot[:len(slotBankhashPrefix)], slotBankhashPrefix) + binary.LittleEndian.PutUint64(prefixedSlot[len(slotBankhashPrefix):], slot) + return index.Set(prefixedSlot[:], bh, &pebble.WriteOptions{}) +} diff --git a/pkg/snapshot/shard.go b/pkg/snapshot/shard.go index 2290eb93..3a32f83f 100644 --- a/pkg/snapshot/shard.go +++ b/pkg/snapshot/shard.go @@ -228,7 +228,6 @@ func (s *shard) logToSST(ctx context.Context) error { return 1 } }) - var vBytes [vlen]byte // Write to SST sstFilename := fmt.Sprintf("%s.sst", filename) sstFile, err := vfs.Default.Create(sstFilename) @@ -243,8 +242,7 @@ func (s *shard) logToSST(ctx context.Context) error { if lastWritten >= 0 && bytes.Equal(kv.k[:], pairs[lastWritten].k[:]) { continue } - kv.v.Marshal(&vBytes) - if err := w.Set(kv.k[:], vBytes[:]); err != nil { + if err := accountsdb.SSTSetAccountIndexEntry(w, kv.k, kv.v); err != nil { return fmt.Errorf("writing to SST: %w", err) } lastWritten = i