Skip to content

Commit

Permalink
feat: replicate v1 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tangui-Bitfly committed Jan 22, 2025
1 parent 5a0cecf commit 2442614
Show file tree
Hide file tree
Showing 12 changed files with 794 additions and 590 deletions.
2 changes: 1 addition & 1 deletion backend/cmd/eth1indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func Run() {
return
}

transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
transforms := make([]db.TransformFunc, 0)
transforms = append(transforms,
bt.TransformBlock,
bt.TransformTx,
Expand Down
21 changes: 10 additions & 11 deletions backend/cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package misc
import (
"bytes"
"context"
"os"

"database/sql"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"math"
"math/big"
"net/http"
"os"
"strconv"
"strings"
"sync"
Expand All @@ -23,6 +23,13 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/ethereum/go-ethereum/common"
"github.com/go-redis/redis/v8"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
go_ens "github.com/wealdtech/go-ens/v3"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"

"github.com/gobitfly/beaconchain/cmd/misc/commands"
"github.com/gobitfly/beaconchain/cmd/misc/misctypes"
"github.com/gobitfly/beaconchain/pkg/commons/cache"
Expand All @@ -37,14 +44,6 @@ import (
"github.com/gobitfly/beaconchain/pkg/exporter/modules"
"github.com/gobitfly/beaconchain/pkg/exporter/services"
"github.com/gobitfly/beaconchain/pkg/notification"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
go_ens "github.com/wealdtech/go-ens/v3"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"

"flag"

"github.com/Gurpartap/storekit-go"
)
Expand Down Expand Up @@ -1607,7 +1606,7 @@ func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, co
return
}

transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
transforms := make([]db.TransformFunc, 0)

log.Infof("transformerFlag: %v", transformerFlag)
transformerList := strings.Split(transformerFlag, ",")
Expand Down
40 changes: 25 additions & 15 deletions backend/pkg/commons/db/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (

gcp_bigtable "cloud.google.com/go/bigtable"
"github.com/go-redis/redis/v8"
itypes "github.com/gobitfly/eth-rewards/types"

"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
itypes "github.com/gobitfly/eth-rewards/types"

"golang.org/x/sync/errgroup"
"google.golang.org/api/option"
Expand Down Expand Up @@ -66,7 +67,7 @@ type Bigtable struct {

tableMachineMetrics *gcp_bigtable.Table

redisCache *redis.Client
redisCache RedisClient

LastAttestationCache map[uint64]uint64
LastAttestationCacheMux *sync.Mutex
Expand All @@ -78,7 +79,29 @@ type Bigtable struct {
machineMetricsQueuedWritesChan chan (types.BulkMutation)
}

type RedisClient interface {
SCard(ctx context.Context, key string) *redis.IntCmd
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
Pipeline() redis.Pipeliner
Get(ctx context.Context, key string) *redis.StringCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
}

func InitBigtable(project, instance, chainId, redisAddress string) (*Bigtable, error) {
rdc := redis.NewClient(&redis.Options{
Addr: redisAddress,
ReadTimeout: time.Second * 20,
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
if err := rdc.Ping(ctx).Err(); err != nil {
return nil, err
}

return InitBigtableWithCache(ctx, project, instance, chainId, rdc)
}

func InitBigtableWithCache(ctx context.Context, project, instance, chainId string, rdc RedisClient) (*Bigtable, error) {
if utils.Config.Bigtable.Emulator {
if utils.Config.Bigtable.EmulatorHost == "" {
utils.Config.Bigtable.EmulatorHost = "127.0.0.1"
Expand All @@ -90,26 +113,13 @@ func InitBigtable(project, instance, chainId, redisAddress string) (*Bigtable, e
log.Fatal(err, "unable to set bigtable emulator environment variable", 0)
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

poolSize := 50
btClient, err := gcp_bigtable.NewClient(ctx, project, instance, option.WithGRPCConnectionPool(poolSize))
// btClient, err := gcp_bigtable.NewClient(context.Background(), project, instance)

if err != nil {
return nil, err
}

rdc := redis.NewClient(&redis.Options{
Addr: redisAddress,
ReadTimeout: time.Second * 20,
})

if err := rdc.Ping(ctx).Err(); err != nil {
return nil, err
}

bt := &Bigtable{
client: btClient,
tableData: btClient.Open("data"),
Expand Down
48 changes: 41 additions & 7 deletions backend/pkg/commons/db/bigtable_eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"math/big"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -23,8 +24,6 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"

"strconv"

gcp_bigtable "cloud.google.com/go/bigtable"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -711,7 +710,9 @@ func TimestampToBigtableTimeDesc(ts time.Time) string {
return fmt.Sprintf("%04d%02d%02d%02d%02d%02d", 9999-ts.Year(), 12-ts.Month(), 31-ts.Day(), 23-ts.Hour(), 59-ts.Minute(), 59-ts.Second())
}

func (bigtable *Bigtable) IndexEventsWithTransformers(start, end int64, transforms []func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error), concurrency int64, cache *freecache.Cache) error {
type TransformFunc func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error)

func (bigtable *Bigtable) IndexEventsWithTransformers(start, end int64, transforms []TransformFunc, concurrency int64, cache *freecache.Cache) error {
g := new(errgroup.Group)
g.SetLimit(int(concurrency))

Expand Down Expand Up @@ -1048,8 +1049,19 @@ func (bigtable *Bigtable) TransformTx(blk *types.Eth1Block, cache *freecache.Cac
BlobTxFee: blobFee,
BlobGasPrice: tx.GetBlobGasPrice(),
IsContractCreation: isContract,
ErrorMsg: tx.GetErrorMsg(),
ErrorMsg: "",
Status: types.StatusType(tx.Status),
}
for _, itx := range tx.Itx {
if itx.ErrorMsg != "" {
indexedTx.ErrorMsg = itx.ErrorMsg
if indexedTx.Status == types.StatusType_SUCCESS {
indexedTx.Status = types.StatusType_PARTIAL
}
break
}
}

// Mark Sender and Recipient for balance update
bigtable.markBalanceUpdate(indexedTx.From, []byte{0x0}, bulkMetadataUpdates, cache)
bigtable.markBalanceUpdate(indexedTx.To, []byte{0x0}, bulkMetadataUpdates, cache)
Expand Down Expand Up @@ -1135,9 +1147,16 @@ func (bigtable *Bigtable) TransformBlobTx(blk *types.Eth1Block, cache *freecache
GasPrice: tx.GetGasPrice(),
BlobTxFee: blobFee,
BlobGasPrice: tx.GetBlobGasPrice(),
ErrorMsg: tx.GetErrorMsg(),
ErrorMsg: "",
BlobVersionedHashes: tx.GetBlobVersionedHashes(),
}
for _, itx := range tx.Itx {
if itx.ErrorMsg != "" {
indexedTx.ErrorMsg = itx.ErrorMsg
break
}
}

// Mark Sender and Recipient for balance update
bigtable.markBalanceUpdate(indexedTx.From, []byte{0x0}, bulkMetadataUpdates, cache)
bigtable.markBalanceUpdate(indexedTx.To, []byte{0x0}, bulkMetadataUpdates, cache)
Expand Down Expand Up @@ -1240,7 +1259,7 @@ func (bigtable *Bigtable) TransformContract(blk *types.Eth1Block, cache *freecac
contractUpdate := &types.IsContractUpdate{
IsContract: itx.GetType() == "create",
// also use success status of enclosing transaction, as even successful sub-calls can still be reverted later in the tx
Success: itx.GetErrorMsg() == "" && tx.GetErrorMsg() == "",
Success: itx.GetErrorMsg() == "" && tx.GetStatus() == 1,
}
b, err := proto.Marshal(contractUpdate)
if err != nil {
Expand Down Expand Up @@ -1303,12 +1322,26 @@ func (bigtable *Bigtable) TransformItx(blk *types.Eth1Block, cache *freecache.Ca
}
iReversed := reversePaddedIndex(i, TX_PER_BLOCK_LIMIT)

var revertSource string
for j, itx := range tx.GetItx() {
if j >= ITX_PER_TX_LIMIT {
if j > ITX_PER_TX_LIMIT {
return nil, nil, fmt.Errorf("unexpected number of internal transactions in block expected at most %d but got: %v, tx: %x", ITX_PER_TX_LIMIT, j, tx.GetHash())
}
jReversed := reversePaddedIndex(j, ITX_PER_TX_LIMIT)

// check for error before skipping, otherwise we loose track of cascading reverts
var reverted bool
if itx.ErrorMsg != "" {
reverted = true
// only save the highest root revert
if revertSource == "" || !strings.HasPrefix(itx.Path, revertSource) {
revertSource = strings.TrimSuffix(itx.Path, "]")
}
}
if revertSource != "" && strings.HasPrefix(itx.Path, revertSource) {
reverted = true
}

if itx.Path == "[]" || bytes.Equal(itx.Value, []byte{0x0}) { // skip top level and empty calls
continue
}
Expand All @@ -1322,6 +1355,7 @@ func (bigtable *Bigtable) TransformItx(blk *types.Eth1Block, cache *freecache.Ca
From: itx.GetFrom(),
To: itx.GetTo(),
Value: itx.GetValue(),
Reverted: reverted,
}

bigtable.markBalanceUpdate(indexedItx.To, []byte{0x0}, bulkMetadataUpdates, cache)
Expand Down
4 changes: 3 additions & 1 deletion backend/pkg/commons/db/bigtable_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
gcp_bigtable "cloud.google.com/go/bigtable"
)

var ErrTableAlreadyExist = fmt.Errorf("aborting bigtable schema init as tables are already present")

func InitBigtableSchema() error {
tables := make(map[string]map[string]gcp_bigtable.GCPolicy)

Expand Down Expand Up @@ -74,7 +76,7 @@ func InitBigtableSchema() error {
}

if len(existingTables) > 0 {
return fmt.Errorf("aborting bigtable schema init as tables are already present")
return ErrTableAlreadyExist
}

for name, definition := range tables {
Expand Down
14 changes: 13 additions & 1 deletion backend/pkg/commons/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"

"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/metrics"

Expand All @@ -32,14 +33,25 @@ import (
"github.com/jackc/pgx/v5/stdlib"
)

type SQLReaderDb interface {
Close() error
Get(dest interface{}, query string, args ...interface{}) error
GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Select(dest interface{}, query string, args ...interface{}) error
SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Query(query string, args ...any) (*sql.Rows, error)
Preparex(query string) (*sqlx.Stmt, error)
Rebind(query string) string
}

//go:embed migrations/*/*.sql
var EmbedMigrations embed.FS

var DBPGX *pgxpool.Conn

// DB is a pointer to the explorer-database
var WriterDb *sqlx.DB
var ReaderDb *sqlx.DB
var ReaderDb SQLReaderDb

var UserReader *sqlx.DB
var UserWriter *sqlx.DB
Expand Down
6 changes: 3 additions & 3 deletions backend/pkg/commons/db/ens.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (bigtable *Bigtable) TransformEnsNameRegistered(blk *types.Eth1Block, cache
metrics.TaskDuration.WithLabelValues("bt_transform_ens").Observe(time.Since(startTime).Seconds())
}()

bulkData = &types.BulkMutations{}
bulkMetadataUpdates = &types.BulkMutations{}
var ensCrontractAddresses map[string]string
switch bigtable.chainId {
case "1":
Expand All @@ -90,11 +92,9 @@ func (bigtable *Bigtable) TransformEnsNameRegistered(blk *types.Eth1Block, cache
case "11155111":
ensCrontractAddresses = ensContracts.ENSCrontractAddressesSepolia
default:
return nil, nil, nil
return bulkData, bulkMetadataUpdates, nil
}

bulkData = &types.BulkMutations{}
bulkMetadataUpdates = &types.BulkMutations{}
keys := make(map[string]bool)
ethLog := gethtypes.Log{}

Expand Down
4 changes: 2 additions & 2 deletions backend/pkg/commons/db/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,7 +1602,7 @@ func WriteExecutionChartSeriesForDay(day int64) error {
totalBlobCount = totalBlobCount.Add(decimal.NewFromInt(int64(len(tx.BlobVersionedHashes))))

default:
log.Fatal(fmt.Errorf("error unknown tx type %v hash: %x", tx.Status, tx.Hash), "", 0)
log.Fatal(fmt.Errorf("error unknown tx type %v hash: %x", tx.Type, tx.Hash), "", 0)
}
totalTxFees = totalTxFees.Add(txFees)

Expand All @@ -1611,7 +1611,7 @@ func WriteExecutionChartSeriesForDay(day int64) error {
failedTxCount += 1
totalFailedGasUsed = totalFailedGasUsed.Add(gasUsed)
totalFailedTxFee = totalFailedTxFee.Add(txFees)
case 1:
case 1, 2:
successTxCount += 1
default:
log.Fatal(fmt.Errorf("error unknown status code %v hash: %x", tx.Status, tx.Hash), "", 0)
Expand Down
Loading

0 comments on commit 2442614

Please sign in to comment.