Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ func Execute() {
func init() {
rootCmd.PersistentFlags().StringP("http-rpc", "", "http://127.0.0.1:8545", "RPC HTTP Endpoint")
rootCmd.PersistentFlags().StringP("ws-rpc", "", "ws://127.0.0.1:8546", "RPC WS Endpoint")
rootCmd.PersistentFlags().StringP("rpc-file", "", "", "Path to a file that contains RPC HTTP endpoints, one per line starting from line 2")
rootCmd.PersistentFlags().StringP("cl-address", "", "", "CL address for validator")
rootCmd.PersistentFlags().IntP("mempool", "", 5000, "Mempool size")
}
4 changes: 3 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ var runCmd = &cobra.Command{
Long: "To run the benchmark",
Run: func(cmd *cobra.Command, args []string) {
httpRpc, _ := cmd.Flags().GetString("http-rpc")
rpcFile, _ := cmd.Flags().GetString("rpc-file")
wsRpc, _ := cmd.Flags().GetString("ws-rpc")
faucetPrivateKey, _ := cmd.Flags().GetString("faucet-private-key")
clAddress, _ := cmd.Flags().GetString("cl-address")
senderCount, _ := cmd.Flags().GetInt("sender-count")
txCount, _ := cmd.Flags().GetInt("tx-count")
txType, _ := cmd.Flags().GetString("tx-type")
mempool, _ := cmd.Flags().GetInt("mempool")

run.Run(httpRpc, wsRpc, faucetPrivateKey, senderCount, txCount, txType, mempool)
run.Run(httpRpc, rpcFile, wsRpc, faucetPrivateKey, clAddress, senderCount, txCount, txType, mempool)
},
}

Expand Down
4 changes: 2 additions & 2 deletions lib/cmd/load/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (l *Loader) LoadAndRun() error {
}
}

err = util.WaitForReceiptsOfTxs(client, txs, 20*time.Second)
err = util.WaitForReceiptsOfTxs(client, txs, 100*time.Second)
if err != nil {
return err
}
Expand All @@ -52,7 +52,7 @@ func (l *Loader) LoadAndRun() error {
}

// TODO, add limiter
transmitter, err := run.NewTransmitter(l.RpcUrl, nil)
transmitter, err := run.NewTransmitter([]string{l.RpcUrl}, nil)
if err != nil {
return err
}
Expand Down
164 changes: 157 additions & 7 deletions lib/cmd/run/ethlistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net/http"
"strconv"
"time"

limiterpkg "github.com/0glabs/evmchainbench/lib/limiter"
"github.com/gorilla/websocket"
Expand All @@ -20,19 +21,26 @@ type BlockInfo struct {

type EthereumListener struct {
wsURL string
clAddress string // New field for CL address
conn *websocket.Conn
tmConn *websocket.Conn // New field for Tendermint WebSocket connection
limiter *limiterpkg.RateLimiter
blockStat []BlockInfo
quit chan struct{}
bestTPS int64
gasUsedAtBestTPS float64
selfblocks map[int]bool // Track blocks proposed by CL validator
blockTxns map[int]int // Track transaction count for each block height
}

func NewEthereumListener(wsURL string, limiter *limiterpkg.RateLimiter) *EthereumListener {
func NewEthereumListener(wsURL, clAddress string, limiter *limiterpkg.RateLimiter) *EthereumListener {
return &EthereumListener{
wsURL: wsURL,
limiter: limiter,
quit: make(chan struct{}),
wsURL: wsURL,
clAddress: clAddress,
limiter: limiter,
quit: make(chan struct{}),
selfblocks: make(map[int]bool),
blockTxns: make(map[int]int),
}
}

Expand All @@ -45,6 +53,16 @@ func (el *EthereumListener) Connect() error {
return nil
}

// ConnectTendermint connects to the Tendermint WebSocket endpoint
func (el *EthereumListener) ConnectTendermint() error {
conn, _, err := websocket.DefaultDialer.Dial("ws://localhost:26657/websocket", http.Header{})
if err != nil {
return fmt.Errorf("tendermint dial error: %v", err)
}
el.tmConn = conn
return nil
}

func (el *EthereumListener) SubscribeNewHeads() error {
subscribeMsg := map[string]interface{}{
"jsonrpc": "2.0",
Expand All @@ -62,6 +80,92 @@ func (el *EthereumListener) SubscribeNewHeads() error {
return nil
}

// SubscribeTendermintNewBlock subscribes to Tendermint NewBlock events
func (el *EthereumListener) SubscribeTendermintNewBlock() error {
subscribeMsg := map[string]interface{}{
"jsonrpc": "2.0",
"method": "subscribe",
"params": []interface{}{"tm.event='NewBlock'"},
"id": 1,
}
err := el.tmConn.WriteJSON(subscribeMsg)
if err != nil {
return fmt.Errorf("tendermint subscribe error: %v", err)
}

go el.listenForTendermintMessages()

return nil
}

func (el *EthereumListener) listenForTendermintMessages() {
for {
_, message, err := el.tmConn.ReadMessage()
if err != nil {
log.Println("Tendermint WebSocket read error:", err)
return
}

var response map[string]interface{}
err = json.Unmarshal(message, &response)
if err != nil {
log.Println("Tendermint unmarshal error:", err)
continue
}

el.handleTendermintMessage(response)
}
}

func (el *EthereumListener) handleTendermintMessage(response map[string]interface{}) {
// Get current time with millisecond precision
currentTime := time.Now().Format("2006-01-02 15:04:05.000")

// Check if this is a NewBlock event
result, ok := response["result"]
if !ok {
return
}

if data, ok := result.(map[string]interface{})["data"]; ok {
if dataMap, ok := data.(map[string]interface{}); ok {
if value, ok := dataMap["value"]; ok {
if valueMap, ok := value.(map[string]interface{}); ok {
if block, ok := valueMap["block"]; ok {
if blockMap, ok := block.(map[string]interface{}); ok {
if header, ok := blockMap["header"]; ok {
if headerMap, ok := header.(map[string]interface{}); ok {
if height, ok := headerMap["height"]; ok {
fmt.Printf("[%s] Tendermint NewBlock - Height: %v\n", currentTime, height)

// Parse block height
heightInt, _ := strconv.ParseInt(height.(string), 10, 64)

// Check if proposer_address matches CL validator address
if proposerAddress, ok := headerMap["proposer_address"]; ok {
if proposerStr, ok := proposerAddress.(string); ok {
if proposerStr == el.clAddress {
// Check if this block height has transaction data and increase limit
if txCount, exists := el.blockTxns[int(heightInt)]; exists {
el.limiter.IncreaseLimit(txCount)
}
// Add block height to selfblocks set
el.selfblocks[int(heightInt)] = true
fmt.Printf("[%s] Self-proposed block detected - Height: %d\n", currentTime, heightInt)
}
}
}
}
}
}
}
}
}
}
}
}
}

func (el *EthereumListener) listenForMessages() {
for {
_, message, err := el.conn.ReadMessage()
Expand All @@ -78,6 +182,17 @@ func (el *EthereumListener) listenForMessages() {

if method, ok := response["method"]; ok && method == "eth_subscription" {
el.handleNewHead(response)
} else if id, ok := response["id"]; ok && id == float64(1) {
// Check if this is a txpool_status response
if result, ok := response["result"].(map[string]interface{}); ok {
if _, hasPending := result["pending"]; hasPending {
el.handleTxpoolStatus(response)
} else {
el.handleBlockResponse(response)
}
} else {
el.handleBlockResponse(response)
}
} else {
el.handleBlockResponse(response)
}
Expand Down Expand Up @@ -117,13 +232,44 @@ func (el *EthereumListener) handleNewHead(response map[string]interface{}) {
}
}

func (el *EthereumListener) handleTxpoolStatus(response map[string]interface{}) {
if result, ok := response["result"].(map[string]interface{}); ok {
pending, _ := strconv.ParseInt(result["pending"].(string), 10, 64)
queued, _ := strconv.ParseInt(result["queued"].(string), 10, 64)

// Get current time with millisecond precision
currentTime := time.Now().Format("2006-01-02 15:04:05.000")
fmt.Printf("[%s] Mempool Status - Pending: %d, Queued: %d\n", currentTime, pending, queued)
}
}

func (el *EthereumListener) handleBlockResponse(response map[string]interface{}) {
if result, ok := response["result"].(map[string]interface{}); ok {
if txns, ok := result["transactions"].([]interface{}); ok {
el.limiter.IncreaseLimit(len(txns))
ts, _ := strconv.ParseInt(result["timestamp"].(string)[2:], 16, 64)
systs := time.Now().Unix()
ts := systs
// fmt.Println("ts", ts, "systs", systs)
gasUsed, _ := strconv.ParseInt(result["gasUsed"].(string)[2:], 16, 64)
gasLimit, _ := strconv.ParseInt(result["gasLimit"].(string)[2:], 16, 64)

// Get current time with millisecond precision
currentTime := time.Now().Format("2006-01-02 15:04:05.000")
// Get block number
blockNumber := result["number"].(string)

// Record transaction count for this block height
blockHeight, _ := strconv.ParseInt(blockNumber[2:], 16, 64) // Remove "0x" prefix and parse as hex

// Only increase limit if this block is in selfblocks (proposed by CL validator)
if el.selfblocks[int(blockHeight)] {
el.limiter.IncreaseLimit(len(txns))
}

el.blockTxns[int(blockHeight)] = len(txns)

// Output current time, block number, and transaction count
fmt.Printf("[%s] Block: %s, Transactions: %d, GasUsed: %d, GasLimit: %d\n", currentTime, blockNumber, len(txns), gasUsed, gasLimit)

el.blockStat = append(el.blockStat, BlockInfo{
Time: ts,
TxCount: int64(len(txns)),
Expand All @@ -142,8 +288,9 @@ func (el *EthereumListener) handleBlockResponse(response map[string]interface{})
}
}
timeSpan := el.blockStat[len(el.blockStat)-1].Time - el.blockStat[0].Time
// fmt.Println("timeSpan", timeSpan)
// calculate TPS and gas used percentage
if timeSpan > 50 {
if timeSpan > 10 {
totalTxCount := int64(0)
totalGasLimit := int64(0)
totalGasUsed := int64(0)
Expand Down Expand Up @@ -191,5 +338,8 @@ func (el *EthereumListener) Close() {
if el.conn != nil {
el.conn.Close()
}
if el.tmConn != nil {
el.tmConn.Close()
}
close(el.quit)
}
72 changes: 68 additions & 4 deletions lib/cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,66 @@ package run

import (
"log"
"os"
"strings"

"github.com/0glabs/evmchainbench/lib/generator"
limiterpkg "github.com/0glabs/evmchainbench/lib/limiter"
"github.com/ethereum/go-ethereum/core/types"
)

func Run(httpRpc, wsRpc, faucetPrivateKey string, senderCount, txCount int, txType string, mempool int) {
generator, err := generator.NewGenerator(httpRpc, faucetPrivateKey, senderCount, txCount, false, "")
func Run(httpRpc, rpcFile, wsRpc, faucetPrivateKey, clAddress string, senderCount, txCount int, txType string, mempool int) {
var rpcUrls []string

if rpcFile != "" {
// Read RPC URLs from file (starting from line 2)
data, err := os.ReadFile(rpcFile)
if err != nil {
log.Fatalf("failed to read rpc file: %v", err)
}

lines := strings.Split(strings.ReplaceAll(string(data), "\r\n", "\n"), "\n")
for i, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}

// Extract IP from lines like "user@ip" or "ip"
var ip string
if strings.Contains(line, "@") {
// Extract IP after @ symbol
parts := strings.Split(line, "@")
if len(parts) == 2 {
ip = strings.TrimSpace(parts[1])
} else {
log.Printf("warning: invalid format in line %d: %s", i+1, line)
continue
}
} else {
// Assume the line is already an IP
ip = line
}

// Convert IP to full RPC URL
rpcUrl := "http://" + ip + ":8545"
rpcUrls = append(rpcUrls, rpcUrl)
}
} else {
rpcUrls = strings.Split(httpRpc, ",")
}

if len(rpcUrls) == 0 {
log.Fatalf("no rpc urls provided: either specify --http-rpc or --rpc-file")
}

/*if len(rpcUrls) < senderCount {
log.Fatalf("insufficient rpc urls: have %d, need at least %d", len(rpcUrls), senderCount)
}*/

primaryRpcUrl := rpcUrls[0]

generator, err := generator.NewGenerator(primaryRpcUrl, faucetPrivateKey, senderCount, txCount, false, "")
if err != nil {
log.Fatalf("Failed to create generator: %v", err)
}
Expand All @@ -32,19 +84,31 @@ func Run(httpRpc, wsRpc, faucetPrivateKey string, senderCount, txCount int, txTy

limiter := limiterpkg.NewRateLimiter(mempool)

ethListener := NewEthereumListener(wsRpc, limiter)
ethListener := NewEthereumListener(wsRpc, clAddress, limiter)
err = ethListener.Connect()
if err != nil {
log.Fatalf("Failed to connect to WebSocket: %v", err)
}

// Connect to Tendermint WebSocket
err = ethListener.ConnectTendermint()
if err != nil {
log.Fatalf("Failed to connect to Tendermint WebSocket: %v", err)
}

// Subscribe new heads
err = ethListener.SubscribeNewHeads()
if err != nil {
log.Fatalf("Failed to subscribe to new heads: %v", err)
}

transmitter, err := NewTransmitter(httpRpc, limiter)
// Subscribe to Tendermint NewBlock events
err = ethListener.SubscribeTendermintNewBlock()
if err != nil {
log.Fatalf("Failed to subscribe to Tendermint NewBlock events: %v", err)
}

transmitter, err := NewTransmitter(rpcUrls, limiter)
if err != nil {
log.Fatalf("Failed to create transmitter: %v", err)
}
Expand Down
Loading
Loading