Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: blockchain.transaction.yyy JSON RPC implementations #78

Merged
merged 12 commits into from
Dec 6, 2022
Merged
18 changes: 12 additions & 6 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ func (db *ReadOnlyDBColumnFamily) selectFrom(prefix []byte, startKey, stopKey pr
// Prefix and handle
options := NewIterateOptions().WithDB(db).WithPrefix(prefix).WithCfHandle(handle)
// Start and stop bounds
options = options.WithStart(startKey.PackKey()).WithStop(stopKey.PackKey()).WithIncludeStop(true)
// Don't include the key
options = options.WithStart(startKey.PackKey()).WithStop(stopKey.PackKey()).WithIncludeStop(false)
// Include the key and value
options = options.WithIncludeKey(true).WithIncludeValue(true)
return []*IterOptions{options}, nil
}
Expand All @@ -455,7 +455,7 @@ func iterate(db *grocksdb.DB, opts []*IterOptions) <-chan []*prefixes.PrefixRowK
for kv := range IterCF(db, o) {
row := make([]*prefixes.PrefixRowKV, 0, 1)
row = append(row, kv)
log.Debugf("iterate[%v][%v] %#v", i, j, kv)
log.Debugf("iterate[%v][%v] %#v -> %#v", i, j, kv.Key, kv.Value)
out <- row
j++
}
Expand All @@ -481,7 +481,7 @@ func innerJoin(db *grocksdb.DB, in <-chan []*prefixes.PrefixRowKV, selectFn func
row = append(row, kvs...)
row = append(row, kv...)
for i, kv := range row {
log.Debugf("row[%v] %#v", i, kv)
log.Debugf("row[%v] %#v -> %#v", i, kv.Key, kv.Value)
}
out <- row
}
Expand Down Expand Up @@ -579,6 +579,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp
// db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts)

if err != nil {
log.Errorf("open db as secondary failed: %v", err)
return nil, err
}

Expand Down Expand Up @@ -685,7 +686,7 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
for {
// FIXME: Figure out best sleep interval
if time.Since(lastPrint) > time.Second {
log.Debug("DetectChanges:", db.LastState)
log.Debugf("DetectChanges: %#v", db.LastState)
lastPrint = time.Now()
}
err := db.detectChanges(notifCh)
Expand Down Expand Up @@ -775,7 +776,12 @@ func (db *ReadOnlyDBColumnFamily) detectChanges(notifCh chan<- interface{}) erro
log.Info("error getting block hash: ", err)
return err
}
notifCh <- &internal.HeightHash{Height: uint64(height), BlockHash: hash}
header, err := db.GetHeader(height)
if err != nil {
log.Info("error getting block header: ", err)
return err
}
notifCh <- &internal.HeightHash{Height: uint64(height), BlockHash: hash, BlockHeader: header}
}
//TODO: ClearCache
log.Warn("implement cache clearing")
Expand Down
218 changes: 217 additions & 1 deletion db/db_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ package db
// db_get.go contains the basic access functions to the database.

import (
"bytes"
"crypto/sha256"
"encoding/hex"
"fmt"
"log"
"math"

"github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/herald.go/db/stack"
"github.com/lbryio/lbcd/chaincfg/chainhash"
"github.com/lbryio/lbcd/wire"
"github.com/linxGnu/grocksdb"

log "github.com/sirupsen/logrus"
)

// GetExpirationHeight returns the expiration height for the given height. Uses
Expand Down Expand Up @@ -65,6 +68,31 @@ func (db *ReadOnlyDBColumnFamily) GetBlockHash(height uint32) ([]byte, error) {
return rawValue, nil
}

func (db *ReadOnlyDBColumnFamily) GetBlockTXs(height uint32) ([]*chainhash.Hash, error) {
handle, err := db.EnsureHandle(prefixes.BlockTXs)
if err != nil {
return nil, err
}

key := prefixes.BlockTxsKey{
Prefix: []byte{prefixes.BlockTXs},
Height: height,
}
slice, err := db.DB.GetCF(db.Opts, handle, key.PackKey())
defer slice.Free()
if err != nil {
return nil, err
}
if slice.Size() == 0 {
return nil, nil
}

rawValue := make([]byte, len(slice.Data()))
copy(rawValue, slice.Data())
value := prefixes.BlockTxsValueUnpack(rawValue)
return value.TxHashes, nil
}

func (db *ReadOnlyDBColumnFamily) GetHeader(height uint32) ([]byte, error) {
handle, err := db.EnsureHandle(prefixes.Header)
if err != nil {
Expand Down Expand Up @@ -271,6 +299,7 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) {
// Lookup in HashXMempoolStatus first.
status, err := db.getMempoolStatus(hashX)
if err == nil && status != nil {
log.Debugf("(mempool) status(%#v) -> %#v", hashX, status)
return status, err
}

Expand All @@ -291,6 +320,7 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) {
copy(rawValue, slice.Data())
value := prefixes.HashXStatusValue{}
value.UnpackValue(rawValue)
log.Debugf("status(%#v) -> %#v", hashX, value.Status)
return value.Status, nil
}

Expand All @@ -299,6 +329,11 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) {
if err != nil {
return nil, err
}

if len(txs) == 0 {
return []byte{}, err
}

hash := sha256.New()
for _, tx := range txs {
hash.Write([]byte(fmt.Sprintf("%s:%d:", tx.TxHash.String(), tx.Height)))
Expand Down Expand Up @@ -731,6 +766,70 @@ func (db *ReadOnlyDBColumnFamily) FsGetClaimByHash(claimHash []byte) (*ResolveRe
)
}

func (db *ReadOnlyDBColumnFamily) GetTx(txhash *chainhash.Hash) ([]byte, *wire.MsgTx, error) {
// Lookup in MempoolTx first.
raw, tx, err := db.getMempoolTx(txhash)
if err == nil && raw != nil && tx != nil {
return raw, tx, err
}

handle, err := db.EnsureHandle(prefixes.Tx)
if err != nil {
return nil, nil, err
}

key := prefixes.TxKey{Prefix: []byte{prefixes.Tx}, TxHash: txhash}
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
defer slice.Free()
if err != nil {
return nil, nil, err
}
if slice.Size() == 0 {
return nil, nil, nil
}

rawValue := make([]byte, len(slice.Data()))
copy(rawValue, slice.Data())
value := prefixes.TxValue{}
value.UnpackValue(rawValue)
var msgTx wire.MsgTx
err = msgTx.Deserialize(bytes.NewReader(value.RawTx))
if err != nil {
return nil, nil, err
}
return value.RawTx, &msgTx, nil
}

func (db *ReadOnlyDBColumnFamily) getMempoolTx(txhash *chainhash.Hash) ([]byte, *wire.MsgTx, error) {
handle, err := db.EnsureHandle(prefixes.MempoolTx)
if err != nil {
return nil, nil, err
}

key := prefixes.MempoolTxKey{Prefix: []byte{prefixes.Tx}, TxHash: txhash}
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
defer slice.Free()
if err != nil {
return nil, nil, err
}
if slice.Size() == 0 {
return nil, nil, nil
}

rawValue := make([]byte, len(slice.Data()))
copy(rawValue, slice.Data())
value := prefixes.MempoolTxValue{}
value.UnpackValue(rawValue)
var msgTx wire.MsgTx
err = msgTx.Deserialize(bytes.NewReader(value.RawTx))
if err != nil {
return nil, nil, err
}
return value.RawTx, &msgTx, nil
}

func (db *ReadOnlyDBColumnFamily) GetTxCount(height uint32) (*prefixes.TxCountValue, error) {
handle, err := db.EnsureHandle(prefixes.TxCount)
if err != nil {
Expand All @@ -754,6 +853,123 @@ func (db *ReadOnlyDBColumnFamily) GetTxCount(height uint32) (*prefixes.TxCountVa
return value, nil
}

func (db *ReadOnlyDBColumnFamily) GetTxHeight(txhash *chainhash.Hash) (uint32, error) {
handle, err := db.EnsureHandle(prefixes.TxNum)
if err != nil {
return 0, err
}

key := prefixes.TxNumKey{Prefix: []byte{prefixes.TxNum}, TxHash: txhash}
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
defer slice.Free()
if err != nil {
return 0, err
}
if slice.Size() == 0 {
return 0, nil
}

// No slice copy needed. Value will be abandoned.
value := prefixes.TxNumValueUnpack(slice.Data())
height := stack.BisectRight(db.TxCounts, []uint32{value.TxNum})[0]
return height, nil
}

type TxMerkle struct {
TxHash *chainhash.Hash
RawTx []byte
Height int
Pos uint32
Merkle []*chainhash.Hash
}

// merklePath selects specific transactions by position within blockTxs.
// The resulting merkle path (aka merkle branch, or merkle) is a list of TX hashes
// which are in sibling relationship with TX nodes on the path to the root.
func merklePath(pos uint32, blockTxs, partial []*chainhash.Hash) []*chainhash.Hash {
parent := func(p uint32) uint32 {
return p >> 1
}
sibling := func(p uint32) uint32 {
if p%2 == 0 {
return p + 1
} else {
return p - 1
}
}
p := parent(pos)
if p == 0 {
// No parent, path is complete.
return partial
}
// Add sibling to partial path and proceed to parent TX.
return merklePath(p, blockTxs, append(partial, blockTxs[sibling(pos)]))
}

func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxMerkle, error) {

selectedTxNum := make([]*IterOptions, 0, len(tx_hashes))
for _, txhash := range tx_hashes {
key := prefixes.TxNumKey{Prefix: []byte{prefixes.TxNum}, TxHash: &txhash}
log.Debugf("%v", key)
opt, err := db.selectFrom(key.Prefix, &key, &key)
if err != nil {
return nil, err
}
selectedTxNum = append(selectedTxNum, opt...)
}

selectTxByTxNum := func(in []*prefixes.PrefixRowKV) ([]*IterOptions, error) {
txNumKey := in[0].Key.(*prefixes.TxNumKey)
log.Debugf("%v", txNumKey.TxHash.String())
out := make([]*IterOptions, 0, 100)
startKey := &prefixes.TxKey{
Prefix: []byte{prefixes.Tx},
TxHash: txNumKey.TxHash,
}
endKey := &prefixes.TxKey{
Prefix: []byte{prefixes.Tx},
TxHash: txNumKey.TxHash,
}
selectedTx, err := db.selectFrom([]byte{prefixes.Tx}, startKey, endKey)
if err != nil {
return nil, err
}
out = append(out, selectedTx...)
return out, nil
}

blockTxsCache := make(map[uint32][]*chainhash.Hash)
results := make([]TxMerkle, 0, 500)
for kvs := range innerJoin(db.DB, iterate(db.DB, selectedTxNum), selectTxByTxNum) {
if err := checkForError(kvs); err != nil {
return results, err
}
txNumKey, txNumVal := kvs[0].Key.(*prefixes.TxNumKey), kvs[0].Value.(*prefixes.TxNumValue)
_, txVal := kvs[1].Key.(*prefixes.TxKey), kvs[1].Value.(*prefixes.TxValue)
txHeight := stack.BisectRight(db.TxCounts, []uint32{txNumVal.TxNum})[0]
txPos := txNumVal.TxNum - db.TxCounts.Get(txHeight-1)
// We need all the TX hashes in order to select out the relevant ones.
if _, ok := blockTxsCache[txHeight]; !ok {
txs, err := db.GetBlockTXs(txHeight)
if err != nil {
return results, err
}
blockTxsCache[txHeight] = txs
}
blockTxs, _ := blockTxsCache[txHeight]
results = append(results, TxMerkle{
TxHash: txNumKey.TxHash,
RawTx: txVal.RawTx,
Height: int(txHeight),
Pos: txPos,
Merkle: merklePath(txPos, blockTxs, []*chainhash.Hash{}),
})
}
return results, nil
}

func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) {
handle, err := db.EnsureHandle(prefixes.DBState)
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions db/prefixes/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/go-restruct/restruct"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg/chainhash"
)

Expand Down Expand Up @@ -59,6 +60,34 @@ func (kv *BlockTxsValue) Unpack(buf []byte, order binary.ByteOrder) ([]byte, err
return buf[offset:], nil
}

// Struct BigEndianChainHash is a chainhash.Hash stored in external
// byte-order (opposite of other 32 byte chainhash.Hash values). In order
// to reuse chainhash.Hash we need to correct the byte-order.
// Currently this type is used for field Genesis of DBStateValue.

func (kv *BigEndianChainHash) SizeOf() int {
return chainhash.HashSize
}

func (kv *BigEndianChainHash) Pack(buf []byte, order binary.ByteOrder) ([]byte, error) {
offset := 0
hash := kv.CloneBytes()
// HACK: Instances of chainhash.Hash use the internal byte-order.
// Python scribe writes bytes of genesis hash in external byte-order.
internal.ReverseBytesInPlace(hash)
offset += copy(buf[offset:chainhash.HashSize], hash[:])
return buf[offset:], nil
}

func (kv *BigEndianChainHash) Unpack(buf []byte, order binary.ByteOrder) ([]byte, error) {
offset := 0
offset += copy(kv.Hash[:], buf[offset:32])
// HACK: Instances of chainhash.Hash use the internal byte-order.
// Python scribe writes bytes of genesis hash in external byte-order.
internal.ReverseBytesInPlace(kv.Hash[:])
return buf[offset:], nil
}

func genericNew(prefix []byte, key bool) (interface{}, error) {
t, ok := prefixRegistry[prefix[0]]
if !ok {
Expand Down
Loading