From b8d12105e657fe83cfcc9a22da182a68bcbe9d1b Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 23 Oct 2025 11:43:50 +0800 Subject: [PATCH] improve txn sender --- cmd/root.go | 2 + cmd/run.go | 4 +- lib/cmd/load/loader.go | 4 +- lib/cmd/run/ethlistener.go | 164 +++++++++++++++++++++++++++++++++++-- lib/cmd/run/run.go | 72 +++++++++++++++- lib/cmd/run/transmitter.go | 55 ++++++++++--- lib/generator/generator.go | 6 +- 7 files changed, 280 insertions(+), 27 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index a4a3e7b..dfc15e8 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -33,5 +33,7 @@ func Execute() { func init() { rootCmd.PersistentFlags().StringP("http-rpc", "", "http://127.0.0.1:8545", "RPC HTTP Endpoint") rootCmd.PersistentFlags().StringP("ws-rpc", "", "ws://127.0.0.1:8546", "RPC WS Endpoint") + rootCmd.PersistentFlags().StringP("rpc-file", "", "", "Path to a file that contains RPC HTTP endpoints, one per line starting from line 2") + rootCmd.PersistentFlags().StringP("cl-address", "", "", "CL address for validator") rootCmd.PersistentFlags().IntP("mempool", "", 5000, "Mempool size") } diff --git a/cmd/run.go b/cmd/run.go index 26b754a..a837ac9 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -16,14 +16,16 @@ var runCmd = &cobra.Command{ Long: "To run the benchmark", Run: func(cmd *cobra.Command, args []string) { httpRpc, _ := cmd.Flags().GetString("http-rpc") + rpcFile, _ := cmd.Flags().GetString("rpc-file") wsRpc, _ := cmd.Flags().GetString("ws-rpc") faucetPrivateKey, _ := cmd.Flags().GetString("faucet-private-key") + clAddress, _ := cmd.Flags().GetString("cl-address") senderCount, _ := cmd.Flags().GetInt("sender-count") txCount, _ := cmd.Flags().GetInt("tx-count") txType, _ := cmd.Flags().GetString("tx-type") mempool, _ := cmd.Flags().GetInt("mempool") - run.Run(httpRpc, wsRpc, faucetPrivateKey, senderCount, txCount, txType, mempool) + run.Run(httpRpc, rpcFile, wsRpc, faucetPrivateKey, clAddress, senderCount, txCount, txType, mempool) }, } diff --git a/lib/cmd/load/loader.go b/lib/cmd/load/loader.go index 10ac499..03ce240 100644 --- a/lib/cmd/load/loader.go +++ b/lib/cmd/load/loader.go @@ -41,7 +41,7 @@ func (l *Loader) LoadAndRun() error { } } - err = util.WaitForReceiptsOfTxs(client, txs, 20*time.Second) + err = util.WaitForReceiptsOfTxs(client, txs, 100*time.Second) if err != nil { return err } @@ -52,7 +52,7 @@ func (l *Loader) LoadAndRun() error { } // TODO, add limiter - transmitter, err := run.NewTransmitter(l.RpcUrl, nil) + transmitter, err := run.NewTransmitter([]string{l.RpcUrl}, nil) if err != nil { return err } diff --git a/lib/cmd/run/ethlistener.go b/lib/cmd/run/ethlistener.go index 4b25a05..1f557d3 100644 --- a/lib/cmd/run/ethlistener.go +++ b/lib/cmd/run/ethlistener.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "strconv" + "time" limiterpkg "github.com/0glabs/evmchainbench/lib/limiter" "github.com/gorilla/websocket" @@ -20,19 +21,26 @@ type BlockInfo struct { type EthereumListener struct { wsURL string + clAddress string // New field for CL address conn *websocket.Conn + tmConn *websocket.Conn // New field for Tendermint WebSocket connection limiter *limiterpkg.RateLimiter blockStat []BlockInfo quit chan struct{} bestTPS int64 gasUsedAtBestTPS float64 + selfblocks map[int]bool // Track blocks proposed by CL validator + blockTxns map[int]int // Track transaction count for each block height } -func NewEthereumListener(wsURL string, limiter *limiterpkg.RateLimiter) *EthereumListener { +func NewEthereumListener(wsURL, clAddress string, limiter *limiterpkg.RateLimiter) *EthereumListener { return &EthereumListener{ - wsURL: wsURL, - limiter: limiter, - quit: make(chan struct{}), + wsURL: wsURL, + clAddress: clAddress, + limiter: limiter, + quit: make(chan struct{}), + selfblocks: make(map[int]bool), + blockTxns: make(map[int]int), } } @@ -45,6 +53,16 @@ func (el *EthereumListener) Connect() error { return nil } +// ConnectTendermint connects to the Tendermint WebSocket endpoint +func (el *EthereumListener) ConnectTendermint() error { + conn, _, err := websocket.DefaultDialer.Dial("ws://localhost:26657/websocket", http.Header{}) + if err != nil { + return fmt.Errorf("tendermint dial error: %v", err) + } + el.tmConn = conn + return nil +} + func (el *EthereumListener) SubscribeNewHeads() error { subscribeMsg := map[string]interface{}{ "jsonrpc": "2.0", @@ -62,6 +80,92 @@ func (el *EthereumListener) SubscribeNewHeads() error { return nil } +// SubscribeTendermintNewBlock subscribes to Tendermint NewBlock events +func (el *EthereumListener) SubscribeTendermintNewBlock() error { + subscribeMsg := map[string]interface{}{ + "jsonrpc": "2.0", + "method": "subscribe", + "params": []interface{}{"tm.event='NewBlock'"}, + "id": 1, + } + err := el.tmConn.WriteJSON(subscribeMsg) + if err != nil { + return fmt.Errorf("tendermint subscribe error: %v", err) + } + + go el.listenForTendermintMessages() + + return nil +} + +func (el *EthereumListener) listenForTendermintMessages() { + for { + _, message, err := el.tmConn.ReadMessage() + if err != nil { + log.Println("Tendermint WebSocket read error:", err) + return + } + + var response map[string]interface{} + err = json.Unmarshal(message, &response) + if err != nil { + log.Println("Tendermint unmarshal error:", err) + continue + } + + el.handleTendermintMessage(response) + } +} + +func (el *EthereumListener) handleTendermintMessage(response map[string]interface{}) { + // Get current time with millisecond precision + currentTime := time.Now().Format("2006-01-02 15:04:05.000") + + // Check if this is a NewBlock event + result, ok := response["result"] + if !ok { + return + } + + if data, ok := result.(map[string]interface{})["data"]; ok { + if dataMap, ok := data.(map[string]interface{}); ok { + if value, ok := dataMap["value"]; ok { + if valueMap, ok := value.(map[string]interface{}); ok { + if block, ok := valueMap["block"]; ok { + if blockMap, ok := block.(map[string]interface{}); ok { + if header, ok := blockMap["header"]; ok { + if headerMap, ok := header.(map[string]interface{}); ok { + if height, ok := headerMap["height"]; ok { + fmt.Printf("[%s] Tendermint NewBlock - Height: %v\n", currentTime, height) + + // Parse block height + heightInt, _ := strconv.ParseInt(height.(string), 10, 64) + + // Check if proposer_address matches CL validator address + if proposerAddress, ok := headerMap["proposer_address"]; ok { + if proposerStr, ok := proposerAddress.(string); ok { + if proposerStr == el.clAddress { + // Check if this block height has transaction data and increase limit + if txCount, exists := el.blockTxns[int(heightInt)]; exists { + el.limiter.IncreaseLimit(txCount) + } + // Add block height to selfblocks set + el.selfblocks[int(heightInt)] = true + fmt.Printf("[%s] Self-proposed block detected - Height: %d\n", currentTime, heightInt) + } + } + } + } + } + } + } + } + } + } + } + } +} + func (el *EthereumListener) listenForMessages() { for { _, message, err := el.conn.ReadMessage() @@ -78,6 +182,17 @@ func (el *EthereumListener) listenForMessages() { if method, ok := response["method"]; ok && method == "eth_subscription" { el.handleNewHead(response) + } else if id, ok := response["id"]; ok && id == float64(1) { + // Check if this is a txpool_status response + if result, ok := response["result"].(map[string]interface{}); ok { + if _, hasPending := result["pending"]; hasPending { + el.handleTxpoolStatus(response) + } else { + el.handleBlockResponse(response) + } + } else { + el.handleBlockResponse(response) + } } else { el.handleBlockResponse(response) } @@ -117,13 +232,44 @@ func (el *EthereumListener) handleNewHead(response map[string]interface{}) { } } +func (el *EthereumListener) handleTxpoolStatus(response map[string]interface{}) { + if result, ok := response["result"].(map[string]interface{}); ok { + pending, _ := strconv.ParseInt(result["pending"].(string), 10, 64) + queued, _ := strconv.ParseInt(result["queued"].(string), 10, 64) + + // Get current time with millisecond precision + currentTime := time.Now().Format("2006-01-02 15:04:05.000") + fmt.Printf("[%s] Mempool Status - Pending: %d, Queued: %d\n", currentTime, pending, queued) + } +} + func (el *EthereumListener) handleBlockResponse(response map[string]interface{}) { if result, ok := response["result"].(map[string]interface{}); ok { if txns, ok := result["transactions"].([]interface{}); ok { - el.limiter.IncreaseLimit(len(txns)) - ts, _ := strconv.ParseInt(result["timestamp"].(string)[2:], 16, 64) + systs := time.Now().Unix() + ts := systs + // fmt.Println("ts", ts, "systs", systs) gasUsed, _ := strconv.ParseInt(result["gasUsed"].(string)[2:], 16, 64) gasLimit, _ := strconv.ParseInt(result["gasLimit"].(string)[2:], 16, 64) + + // Get current time with millisecond precision + currentTime := time.Now().Format("2006-01-02 15:04:05.000") + // Get block number + blockNumber := result["number"].(string) + + // Record transaction count for this block height + blockHeight, _ := strconv.ParseInt(blockNumber[2:], 16, 64) // Remove "0x" prefix and parse as hex + + // Only increase limit if this block is in selfblocks (proposed by CL validator) + if el.selfblocks[int(blockHeight)] { + el.limiter.IncreaseLimit(len(txns)) + } + + el.blockTxns[int(blockHeight)] = len(txns) + + // Output current time, block number, and transaction count + fmt.Printf("[%s] Block: %s, Transactions: %d, GasUsed: %d, GasLimit: %d\n", currentTime, blockNumber, len(txns), gasUsed, gasLimit) + el.blockStat = append(el.blockStat, BlockInfo{ Time: ts, TxCount: int64(len(txns)), @@ -142,8 +288,9 @@ func (el *EthereumListener) handleBlockResponse(response map[string]interface{}) } } timeSpan := el.blockStat[len(el.blockStat)-1].Time - el.blockStat[0].Time + // fmt.Println("timeSpan", timeSpan) // calculate TPS and gas used percentage - if timeSpan > 50 { + if timeSpan > 10 { totalTxCount := int64(0) totalGasLimit := int64(0) totalGasUsed := int64(0) @@ -191,5 +338,8 @@ func (el *EthereumListener) Close() { if el.conn != nil { el.conn.Close() } + if el.tmConn != nil { + el.tmConn.Close() + } close(el.quit) } diff --git a/lib/cmd/run/run.go b/lib/cmd/run/run.go index 0b7dc44..697eda7 100644 --- a/lib/cmd/run/run.go +++ b/lib/cmd/run/run.go @@ -2,14 +2,66 @@ package run import ( "log" + "os" + "strings" "github.com/0glabs/evmchainbench/lib/generator" limiterpkg "github.com/0glabs/evmchainbench/lib/limiter" "github.com/ethereum/go-ethereum/core/types" ) -func Run(httpRpc, wsRpc, faucetPrivateKey string, senderCount, txCount int, txType string, mempool int) { - generator, err := generator.NewGenerator(httpRpc, faucetPrivateKey, senderCount, txCount, false, "") +func Run(httpRpc, rpcFile, wsRpc, faucetPrivateKey, clAddress string, senderCount, txCount int, txType string, mempool int) { + var rpcUrls []string + + if rpcFile != "" { + // Read RPC URLs from file (starting from line 2) + data, err := os.ReadFile(rpcFile) + if err != nil { + log.Fatalf("failed to read rpc file: %v", err) + } + + lines := strings.Split(strings.ReplaceAll(string(data), "\r\n", "\n"), "\n") + for i, line := range lines { + line = strings.TrimSpace(line) + if line == "" { + continue + } + + // Extract IP from lines like "user@ip" or "ip" + var ip string + if strings.Contains(line, "@") { + // Extract IP after @ symbol + parts := strings.Split(line, "@") + if len(parts) == 2 { + ip = strings.TrimSpace(parts[1]) + } else { + log.Printf("warning: invalid format in line %d: %s", i+1, line) + continue + } + } else { + // Assume the line is already an IP + ip = line + } + + // Convert IP to full RPC URL + rpcUrl := "http://" + ip + ":8545" + rpcUrls = append(rpcUrls, rpcUrl) + } + } else { + rpcUrls = strings.Split(httpRpc, ",") + } + + if len(rpcUrls) == 0 { + log.Fatalf("no rpc urls provided: either specify --http-rpc or --rpc-file") + } + + /*if len(rpcUrls) < senderCount { + log.Fatalf("insufficient rpc urls: have %d, need at least %d", len(rpcUrls), senderCount) + }*/ + + primaryRpcUrl := rpcUrls[0] + + generator, err := generator.NewGenerator(primaryRpcUrl, faucetPrivateKey, senderCount, txCount, false, "") if err != nil { log.Fatalf("Failed to create generator: %v", err) } @@ -32,19 +84,31 @@ func Run(httpRpc, wsRpc, faucetPrivateKey string, senderCount, txCount int, txTy limiter := limiterpkg.NewRateLimiter(mempool) - ethListener := NewEthereumListener(wsRpc, limiter) + ethListener := NewEthereumListener(wsRpc, clAddress, limiter) err = ethListener.Connect() if err != nil { log.Fatalf("Failed to connect to WebSocket: %v", err) } + // Connect to Tendermint WebSocket + err = ethListener.ConnectTendermint() + if err != nil { + log.Fatalf("Failed to connect to Tendermint WebSocket: %v", err) + } + // Subscribe new heads err = ethListener.SubscribeNewHeads() if err != nil { log.Fatalf("Failed to subscribe to new heads: %v", err) } - transmitter, err := NewTransmitter(httpRpc, limiter) + // Subscribe to Tendermint NewBlock events + err = ethListener.SubscribeTendermintNewBlock() + if err != nil { + log.Fatalf("Failed to subscribe to Tendermint NewBlock events: %v", err) + } + + transmitter, err := NewTransmitter(rpcUrls, limiter) if err != nil { log.Fatalf("Failed to create transmitter: %v", err) } diff --git a/lib/cmd/run/transmitter.go b/lib/cmd/run/transmitter.go index cd02e15..1af1a70 100644 --- a/lib/cmd/run/transmitter.go +++ b/lib/cmd/run/transmitter.go @@ -2,6 +2,7 @@ package run import ( "context" + "fmt" "time" "github.com/ethereum/go-ethereum/core/types" @@ -11,23 +12,58 @@ import ( ) type Transmitter struct { - RpcUrl string + // RpcUrls is a collection of RPC endpoints that the transmitter could use + // to broadcast transactions. Senders are distributed across RPC endpoints + // using round-robin fashion. For example, with 8 senders and 4 RPC endpoints, + // each RPC endpoint will handle 2 senders. + RpcUrls []string limiter *limiterpkg.RateLimiter } -func NewTransmitter(rpcUrl string, limiter *limiterpkg.RateLimiter) (*Transmitter, error) { +// NewTransmitter creates a new transmitter instance. It accepts one or many +// RPC endpoints. Passing an empty slice will result in an error. +func NewTransmitter(rpcUrls []string, limiter *limiterpkg.RateLimiter) (*Transmitter, error) { + if len(rpcUrls) == 0 { + return nil, fmt.Errorf("no rpc url provided") + } + return &Transmitter{ - RpcUrl: rpcUrl, + RpcUrls: rpcUrls, limiter: limiter, }, nil } func (t *Transmitter) Broadcast(txsMap map[int]types.Transactions) error { + // Validate: sender count should be a multiple of RPC count for even distribution + senderCount := len(txsMap) + rpcCount := len(t.RpcUrls) + + if senderCount == 0 { + return fmt.Errorf("no transactions to broadcast") + } + + if rpcCount == 0 { + return fmt.Errorf("no rpc urls provided") + } + ch := make(chan error) - for _, txs := range txsMap { - go func(txs []*types.Transaction) { - client, err := ethclient.Dial(t.RpcUrl) + // Iterate through sender indices sequentially to achieve stable mapping between + // sender index and RPC endpoint. Ranging over a map returns keys in random + // order, so using a deterministic loop avoids mismatches across runs. + for senderIndex := 0; senderIndex < senderCount; senderIndex++ { + txs, ok := txsMap[senderIndex] + if !ok { + ch <- fmt.Errorf("txsMap missing sender index %d", senderIndex) + continue + } + + // Select RPC endpoint using round-robin distribution + // Each sender gets a dedicated RPC endpoint, but multiple senders can share the same RPC + rpcUrl := t.RpcUrls[senderIndex%rpcCount] + + go func(rpcUrl string, txs []*types.Transaction) { + client, err := ethclient.Dial(rpcUrl) if err != nil { ch <- err return @@ -42,17 +78,16 @@ func (t *Transmitter) Broadcast(txsMap map[int]types.Transactions) error { return } break - } else { - time.Sleep(10 * time.Millisecond) } + time.Sleep(10 * time.Millisecond) } } + ch <- nil - }(txs) + }(rpcUrl, txs) } - senderCount := len(txsMap) for i := 0; i < senderCount; i++ { err := <-ch if err != nil { diff --git a/lib/generator/generator.go b/lib/generator/generator.go index eb81007..b0af56f 100644 --- a/lib/generator/generator.go +++ b/lib/generator/generator.go @@ -131,7 +131,7 @@ func (g *Generator) approveERC20(token common.Address, spender common.Address) { txs = append(txs, tx) } - err = util.WaitForReceiptsOfTxs(client, txs, 20*time.Second) + err = util.WaitForReceiptsOfTxs(client, txs, 100*time.Second) if err != nil { panic(err) } @@ -170,7 +170,7 @@ func (g *Generator) prepareERC20(contractAddressStr string) { txs = append(txs, tx) } - err = util.WaitForReceiptsOfTxs(client, txs, 20*time.Second) + err = util.WaitForReceiptsOfTxs(client, txs, 100*time.Second) if err != nil { panic(err) } @@ -206,7 +206,7 @@ func (g *Generator) prepareSenders() { txs = append(txs, signedTx) } - err = util.WaitForReceiptsOfTxs(client, txs, 20*time.Second) + err = util.WaitForReceiptsOfTxs(client, txs, 100*time.Second) if err != nil { panic(err) }