From 47ced5f72e38aac89152a3d010ed0c6ababd1dc7 Mon Sep 17 00:00:00 2001 From: "allen.wu" Date: Thu, 26 Mar 2026 18:46:52 +0800 Subject: [PATCH 1/2] feat: StateV2 role state machine + P2P signature verification - StateV2: roleCheckRoutine for dynamic role detection, SequencerHA interface - ApplyBlock: full mutex serialization + idempotent height check - VerifyBlockSignature: fail-close, all V2 blocks must have valid signatures - SignatureStore: independent block signature persistence (LevelDB) - BroadcastReactor: unified signature verification, SyncReq tracking, HasSigner filter - Interfaces: SequencerVerifier, Signer, SequencerHA Co-Authored-By: Claude Opus 4.6 (1M context) --- blocksync/reactor.go | 45 ++++- node/node.go | 23 ++- sequencer/block_verify.go | 35 ++++ sequencer/broadcast_reactor.go | 310 ++++++++++++++++++++--------- sequencer/interfaces.go | 36 +++- sequencer/signature_store.go | 51 +++++ sequencer/state_v2.go | 236 ++++++++++++++-------- sequencer/state_v2_test.go | 350 +++++++++++++++++++++++++++------ 8 files changed, 841 insertions(+), 245 deletions(-) create mode 100644 sequencer/block_verify.go create mode 100644 sequencer/signature_store.go diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 980d3b048d3..2d170f29a4f 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -11,6 +11,7 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync" + "github.com/tendermint/tendermint/sequencer" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" @@ -76,6 +77,10 @@ type Reactor struct { requestsCh <-chan BlockRequest errorsCh <-chan peerError + + // Sequencer signature verification (set after upgrade via SetVerifier/SetSigStore) + verifier sequencer.SequencerVerifier + sigStore *sequencer.SignatureStore } // NewReactor returns new reactor instance. @@ -141,6 +146,16 @@ func (bcR *Reactor) SetStateV2(stateV2 SequencerState) { bcR.stateV2 = stateV2 } +// SetVerifier sets the sequencer verifier for block signature validation. +func (bcR *Reactor) SetVerifier(v sequencer.SequencerVerifier) { + bcR.verifier = v +} + +// SetSigStore sets the signature store for persisting/attaching block signatures. +func (bcR *Reactor) SetSigStore(s *sequencer.SignatureStore) { + bcR.sigStore = s +} + // Pool returns the block pool for broadcast reactor to check peer heights. func (bcR *Reactor) Pool() *BlockPool { return bcR.pool @@ -271,6 +286,19 @@ func (bcR *Reactor) respondToPeerV2(msg *bcproto.BlockRequest, src p2p.Peer) boo return src.TrySend(BlocksyncChannel, msgBytes) } + // Attach signature from store; if unavailable, refuse to serve + sig, err := bcR.sigStore.GetSignature(blockData.Hash) + if err != nil { + bcR.Logger.Error("No signature available for block, refusing to serve", + "height", msg.Height, "err", err) + msgBytes, encErr := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height}) + if encErr != nil { + return false + } + return src.TrySend(BlocksyncChannel, msgBytes) + } + blockData.Signature = sig + bcR.Logger.Debug("respondToPeerV2: got block from geth", "height", msg.Height, "hash", blockData.Hash.Hex()) @@ -293,8 +321,7 @@ func (bcR *Reactor) L2Node() l2node.L2Node { } // syncBlockV2 handles syncing a single BlockV2 in sequencer mode. -// No signature verification during sync - only broadcast channel verifies signatures. -// Returns true if sync was successful, false if there was an error (already handled). +// Verifies block signature using the history-aware verifier, then applies. func (bcR *Reactor) syncBlockV2(block types.SyncableBlock, blocksSynced *uint64, lastRate *float64, lastHundred *time.Time) bool { blockV2, ok := block.(*types.BlockV2) if !ok { @@ -303,13 +330,25 @@ func (bcR *Reactor) syncBlockV2(block types.SyncableBlock, blocksSynced *uint64, return false } - // Apply BlockV2 via stateV2 (no signature verification during sync) + // Verify block signature (uses IsSequencerAt with history-aware lookup) + if err := sequencer.VerifyBlockSignature(bcR.verifier, blockV2); err != nil { + bcR.Logger.Error("Block signature verification failed", "height", blockV2.Number, "err", err) + bcR.pool.RedoRequest(blockV2.GetHeight()) + return false + } + + // Apply BlockV2 via stateV2 if err := bcR.stateV2.ApplyBlock(blockV2); err != nil { bcR.Logger.Error("Failed to apply BlockV2", "height", blockV2.Number, "err", err) bcR.pool.RedoRequest(blockV2.GetHeight()) return false } + // Persist signature for historical serving + if err := bcR.sigStore.SaveSignature(blockV2.Hash, blockV2.Signature); err != nil { + panic(fmt.Sprintf("failed to save signature at height %d: %v", blockV2.Number, err)) + } + bcR.pool.PopRequest() *blocksSynced++ diff --git a/node/node.go b/node/node.go index 8daca33aa92..7577ca81081 100644 --- a/node/node.go +++ b/node/node.go @@ -252,7 +252,7 @@ type Node struct { blockBroadcastReactor *sequencer.BlockBroadcastReactor } -func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { +func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, sigStore *sequencer.SignatureStore, err error) { var blockStoreDB dbm.DB blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) if err != nil { @@ -265,6 +265,14 @@ func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.Block return } + // TODO: add a new store, notify the-3rd parties to integrate + var sigDB dbm.DB + sigDB, err = dbProvider(&DBContext{"signatures", config}) + if err != nil { + return + } + sigStore = sequencer.NewSignatureStore(sigDB) + return } @@ -509,6 +517,8 @@ func createSequencerComponents( logger log.Logger, verifier sequencer.SequencerVerifier, signer sequencer.Signer, + sigStore *sequencer.SignatureStore, + ha sequencer.SequencerHA, ) (*sequencer.StateV2, *sequencer.BlockBroadcastReactor, error) { // Create StateV2 stateV2, err := sequencer.NewStateV2( @@ -517,6 +527,8 @@ func createSequencerComponents( logger, verifier, signer, + sigStore, + ha, ) if err != nil { return nil, nil, fmt.Errorf("failed to create StateV2: %w", err) @@ -529,6 +541,7 @@ func createSequencerComponents( waitSync, logger, verifier, + sigStore, ) broadcastReactor.SetLogger(logger.With("module", "sequencer")) @@ -789,7 +802,7 @@ func NewNode( ) ( *Node, error, ) { - blockStore, stateDB, err := initDBs(config, dbProvider) + blockStore, stateDB, sigStore, err := initDBs(config, dbProvider) if err != nil { return nil, err } @@ -1016,12 +1029,16 @@ func NewNode( logger, sequencerVerifier, sequencerSigner, + sigStore, + nil, // ha: nil for now, Raft HA will be injected in a future milestone ); err != nil { return nil, err } - // Set stateV2 on blocksync reactor for post-upgrade sync + // Set stateV2&verifier&sigStore on blocksync reactor for post-upgrade bcR.SetStateV2(node.stateV2) + bcR.SetVerifier(sequencerVerifier) + bcR.SetSigStore(sigStore) // Register BlockBroadcastReactor with Switch sw.AddReactor("SEQUENCER", node.blockBroadcastReactor) diff --git a/sequencer/block_verify.go b/sequencer/block_verify.go new file mode 100644 index 00000000000..d0b693c4add --- /dev/null +++ b/sequencer/block_verify.go @@ -0,0 +1,35 @@ +package sequencer + +import ( + "fmt" + + "github.com/morph-l2/go-ethereum/crypto" +) + +// VerifyBlockSignature verifies a block's ECDSA signature against the +// expected sequencer at that block's height. All V2 blocks must carry a valid signature. +func VerifyBlockSignature(verifier SequencerVerifier, block *BlockV2) error { + if verifier == nil { + return fmt.Errorf("%w: verifier not configured", ErrInvalidSignature) + } + + if len(block.Signature) == 0 { + return fmt.Errorf("%w: missing signature at height %d", ErrInvalidSignature, block.Number) + } + + pubKey, err := crypto.SigToPub(block.Hash.Bytes(), block.Signature) + if err != nil { + return fmt.Errorf("%w: recover pubkey at height %d: %v", ErrInvalidSignature, block.Number, err) + } + signer := crypto.PubkeyToAddress(*pubKey) + + ok, err := verifier.IsSequencerAt(signer, block.Number) + if err != nil { + return fmt.Errorf("IsSequencerAt height %d: %w", block.Number, err) + } + if !ok { + return fmt.Errorf("%w: signer %s is not sequencer at height %d", + ErrInvalidSignature, signer.Hex(), block.Number) + } + return nil +} diff --git a/sequencer/broadcast_reactor.go b/sequencer/broadcast_reactor.go index 7398a3feebb..b75174fd154 100644 --- a/sequencer/broadcast_reactor.go +++ b/sequencer/broadcast_reactor.go @@ -1,8 +1,6 @@ package sequencer import ( - "context" - "errors" "fmt" "math/big" "math/rand" @@ -12,7 +10,6 @@ import ( "github.com/cosmos/gogoproto/proto" "github.com/morph-l2/go-ethereum/common" - "github.com/morph-l2/go-ethereum/crypto" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" @@ -32,8 +29,20 @@ const ( peerSentCapacity = 500 // Per-peer sent tracking applyInterval = 10 * time.Second syncInterval = 10 * time.Second + + // Sync request tracking + syncRequestTTL = 20 * time.Second + maxPendingSyncRequests = 256 + maxPendingSyncPerPeer = 64 ) +// SyncReq represents a pending sync channel request. +type SyncReq struct { + Height int64 + PeerID p2p.ID + ExpireAt time.Time +} + // BlockPool interface (avoids import cycle) type BlockPool interface { MaxPeerHeight() int64 @@ -56,11 +65,24 @@ type BlockBroadcastReactor struct { seenBlocks *HashSet // Blocks we've seen (dedup) peerSent *PeerHashSet // Blocks sent to each peer - applyMtx sync.Mutex // Protects applyBlock to ensure sequential block application - sequencerStarted bool // True when sequencer mode is actually running (not just registered) + // applyMtx serializes the "check parent + apply + save signature" sequence + // in the reactor's applyBlock path. StateV2.ApplyBlock has its own mutex + // for the actual apply, but applyMtx ensures the parent-hash check and + // signature persistence are also atomic with the apply. + applyMtx sync.Mutex + startMu sync.Mutex + sequencerStarted bool logger log.Logger verifier SequencerVerifier + sigStore *SignatureStore + + // syncRequests tracks pending sync channel requests, keyed by height. + // Used to reject unsolicited responses before decode/verification. + // syncPeerCounts is a secondary index for O(1) per-peer count lookup. + syncRequestsMu sync.Mutex + syncRequests map[int64]*SyncReq + syncPeerCounts map[p2p.ID]int } // NewBlockBroadcastReactor creates a new reactor. @@ -70,17 +92,21 @@ func NewBlockBroadcastReactor( waitSync bool, logger log.Logger, verifier SequencerVerifier, + sigStore *SignatureStore, ) *BlockBroadcastReactor { r := &BlockBroadcastReactor{ - pool: pool, - stateV2: stateV2, - waitSync: waitSync, - recentBlocks: NewBlockRingBuffer(recentBlocksCapacity), - pendingCache: NewPendingBlockCache(), - seenBlocks: NewHashSet(seenBlocksCapacity), - peerSent: NewPeerHashSet(peerSentCapacity), - logger: logger.With("module", "broadcastReactor"), - verifier: verifier, + pool: pool, + stateV2: stateV2, + waitSync: waitSync, + recentBlocks: NewBlockRingBuffer(recentBlocksCapacity), + pendingCache: NewPendingBlockCache(), + seenBlocks: NewHashSet(seenBlocksCapacity), + peerSent: NewPeerHashSet(peerSentCapacity), + syncRequests: make(map[int64]*SyncReq), + syncPeerCounts: make(map[p2p.ID]int), + logger: logger.With("module", "broadcastReactor"), + verifier: verifier, + sigStore: sigStore, } r.BaseReactor = *p2p.NewBaseReactor("BlockBroadcast", r) return r @@ -101,8 +127,10 @@ func (r *BlockBroadcastReactor) OnStart() error { } func (r *BlockBroadcastReactor) StartSequencerRoutines() error { + r.startMu.Lock() + defer r.startMu.Unlock() + if r.sequencerStarted { - r.logger.Error("Sequencer routines already started, skipping") return nil } @@ -114,7 +142,11 @@ func (r *BlockBroadcastReactor) StartSequencerRoutines() error { } } - if r.stateV2.IsSequencerMode() { + // Fullnode (no signer): only applyRoutine (P2P sync) + // Nodes with signer (ActiveSeq / HA-Leader / HA-Follower): only broadcastRoutine + // - roleCheckRoutine is started by StateV2.OnStart() + // - applyRoutine is NOT started (sequencer produces, HA uses Raft) + if r.stateV2.HasSigner() { go r.broadcastRoutine() } else { go r.applyRoutine() @@ -141,6 +173,7 @@ func (r *BlockBroadcastReactor) AddPeer(peer p2p.Peer) { func (r *BlockBroadcastReactor) RemovePeer(peer p2p.Peer, reason interface{}) { r.peerSent.RemovePeer(string(peer.ID())) + r.removeSyncRequestsByPeer(peer.ID()) } func (r *BlockBroadcastReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { @@ -173,7 +206,7 @@ func (r *BlockBroadcastReactor) handleBroadcastMsg(msg interface{}, src p2p.Peer r.logger.Error("Invalid BlockV2", "err", err) return } - r.onBlockV2(blockV2, src, true) // verify signature + r.onBlockV2(blockV2, src, true) // from broadcast channel r.logger.Debug("handleBroadcastMsg", "src", src, "height", blockV2.Number, "hash", blockV2.Hash.Hex()) } default: @@ -188,15 +221,26 @@ func (r *BlockBroadcastReactor) handleSyncMsg(msg interface{}, src p2p.Peer) { r.logger.Debug("handleSyncMsg block request", "msg", msg, "src", src, "height", msg.Height) case *bcproto.BlockResponseV2: if msg.Block != nil { + // Check request before full decode to reject unsolicited responses early + height := int64(msg.Block.Number) + if !r.checkAndTakeSyncRequest(src.ID(), height) { + r.logger.Error("Unsolicited sync response, dropping", + "peer", src.ID(), "height", height) + return + } blockV2, err := types.BlockV2FromProto(msg.Block) if err != nil { r.logger.Error("Invalid BlockV2", "err", err) return } - r.onBlockV2(blockV2, src, false) // no signature verification + r.onBlockV2(blockV2, src, false) // from sync channel r.logger.Debug("handleSyncMsg block response", "src", src, "height", blockV2.Number, "hash", blockV2.Hash.Hex()) } case *bcproto.NoBlockResponse: + if !r.checkAndTakeSyncRequest(src.ID(), msg.Height) { + r.logger.Error("Unsolicited NoBlockResponse, dropping", "peer", src.ID(), "height", msg.Height) + return + } r.logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height) default: r.logger.Debug("Ignoring unknown message on sync channel", "type", reflect.TypeOf(msg)) @@ -207,14 +251,29 @@ func (r *BlockBroadcastReactor) handleSyncMsg(msg interface{}, src p2p.Peer) { // Routines // ============================================================================ -// broadcastRoutine: listen to stateV2 and broadcast new blocks +// broadcastRoutine: listen to block source and broadcast to P2P. +// Data source: HA mode reads from ha.Subscribe(), non-HA from broadcastCh. +// HA Follower consumes channel to prevent buildup but does not broadcast. func (r *BlockBroadcastReactor) broadcastRoutine() { r.logger.Info("Starting block broadcast routine") + + var source <-chan *BlockV2 + if r.stateV2.IsHAMode() { + source = r.stateV2.HASubscribe() + } else { + source = r.stateV2.BroadcastCh() + } + for { select { case <-r.Quit(): return - case block := <-r.stateV2.BroadcastCh(): + case block := <-source: + // HA Follower: consume channel but do not broadcast. + // When promoted to Leader, the next block automatically starts broadcasting. + if r.stateV2.IsHAMode() && !r.stateV2.IsHALeader() { + continue + } r.recentBlocks.Add(block) r.broadcast(block) } @@ -245,14 +304,28 @@ func (r *BlockBroadcastReactor) applyRoutine() { // Core Logic // ============================================================================ -// onBlockV2: receive block from peer -// verifySig: true for broadcast channel, false for sync channel -func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, verifySig bool) { - r.logger.Debug("onBlockV2", "number", block.Number, "hash", block.Hash.Hex(), "verifySig", verifySig) - // Dedup: skip if already seen, only for broadcast channel. - // Sync channel should not check dedup. - if r.markSeen(block.Hash) && verifySig { - r.logger.Debug("onBlockV2 broadcast dedup", "number", block.Number, "hash", block.Hash.Hex(), "verifySig", verifySig) +// onBlockV2: receive block from peer. +// fromBroadcast: true for broadcast channel (triggers gossip), false for sync channel. +func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, fromBroadcast bool) { + // Only Fullnode (no signer) processes P2P blocks: + // - ActiveSequencer: sole producer, P2P blocks are only echoes or invalid + // - HA nodes: receive blocks via Raft, not P2P + if r.stateV2.HasSigner() { + return + } + + r.logger.Debug("onBlockV2", "number", block.Number, "hash", block.Hash.Hex(), "fromBroadcast", fromBroadcast) + + // Dedup: only for broadcast channel + if fromBroadcast && r.markSeen(block.Hash) { + r.logger.Debug("onBlockV2 broadcast dedup", "number", block.Number, "hash", block.Hash.Hex()) + return + } + + // Unified signature verification for all incoming blocks + if err := VerifyBlockSignature(r.verifier, block); err != nil { + r.logger.Error("Block signature verification failed, discarding", + "number", block.Number, "hash", block.Hash.Hex(), "err", err) return } @@ -261,24 +334,16 @@ func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, verifySi localHeight := r.stateV2.LatestHeight() - // Try apply if it's the next block (height + parent match) if r.isNextBlock(block) { - if err := r.applyBlock(block, verifySig); err != nil { + if err := r.applyBlock(block); err != nil { r.logger.Error("Apply failed", "number", block.Number, "hash", block.Hash.Hex(), "err", err) - // Only cache blocks with valid signatures (don't cache signature failures) - if verifySig && !errors.Is(err, ErrInvalidSignature) { - r.logger.Debug("Apply failed, caching block", "number", block.Number, "hash", block.Hash.Hex()) - r.pendingCache.Add(block, uint64(localHeight)) - } + r.pendingCache.Add(block, uint64(localHeight)) return } - // Gossip the latest block to other peers - if verifySig { + if fromBroadcast { r.gossipBlock(block, src.ID()) } - } else if verifySig { - // Cache all other blocks (future or past for potential reorg) - r.logger.Debug("future block, caching", "number", block.Number, "hash", block.Hash.Hex()) + } else { r.pendingCache.Add(block, uint64(localHeight)) } } @@ -303,7 +368,7 @@ func (r *BlockBroadcastReactor) tryApplyFromCache() { break } r.logger.Debug("Trying to apply from cache", "number", block.Number, "hash", block.Hash.Hex()) - if err := r.applyBlock(block, true); err != nil { // should signature verification + if err := r.applyBlock(block); err != nil { r.logger.Error("Apply from cache failed", "number", block.Number, "err", err) break } @@ -331,17 +396,30 @@ func (r *BlockBroadcastReactor) checkSyncGap() { r.requestMissingBlocks(localHeight+1, maxPeerHeight) } -// requestMissingBlocks requests blocks in range [start, end] from peers +// requestMissingBlocks requests blocks in range [start, end] from peers. +// Respects maxOutstandingTotal, maxOutstandingPerPeer, and maxNewRequestsPerTick limits. func (r *BlockBroadcastReactor) requestMissingBlocks(start, end int64) { + r.cleanupExpiredRequests() + peers := r.Switch.Peers().List() if len(peers) == 0 { return } for height := start; height <= end; height++ { - // Find a peer that has this height + if r.pendingSyncCount() >= maxPendingSyncRequests { + break + } + // Skip if already have a pending request for this height + r.syncRequestsMu.Lock() + _, exists := r.syncRequests[height] + r.syncRequestsMu.Unlock() + if exists { + continue + } + + // findPeerWithHeight already skips peers exceeding per-peer limit peer := r.findPeerWithHeight(peers, height) - r.logger.Debug("Finding peer with height", "height", height, "peer", peer.ID()) if peer == nil { continue } @@ -352,14 +430,15 @@ func (r *BlockBroadcastReactor) requestMissingBlocks(start, end int64) { r.logger.Error("Failed to encode BlockRequest", "height", height, "err", err) continue } - r.logger.Info("Requesting block", "height", height, "peer", peer.ID()) if !peer.TrySend(SequencerSyncChannel, bz) { - r.logger.Error("Failed to send BlockRequest (TrySend failed)", "height", height, "peer", peer.ID()) + r.logger.Error("Failed to send BlockRequest", "height", height, "peer", peer.ID()) + continue } + r.recordSyncRequest(peer.ID(), height) } } -// findPeerWithHeight finds a random peer that has the given height. +// findPeerWithHeight finds a random peer that has the given height and is within per-peer request limit. // Uses random start index to avoid always hitting the same peer. func (r *BlockBroadcastReactor) findPeerWithHeight(peers []p2p.Peer, height int64) p2p.Peer { n := len(peers) @@ -367,11 +446,11 @@ func (r *BlockBroadcastReactor) findPeerWithHeight(peers []p2p.Peer, height int6 return nil } - // Random start, then wrap around start := rand.Intn(n) for i := 0; i < n; i++ { peer := peers[(start+i)%n] - if r.pool.GetPeerHeight(peer.ID()) >= height { + if r.pool.GetPeerHeight(peer.ID()) >= height && + r.pendingSyncCountByPeer(peer.ID()) < maxPendingSyncPerPeer { return peer } } @@ -389,18 +468,12 @@ func (r *BlockBroadcastReactor) isNextBlock(block *BlockV2) bool { return block.Number == currentBlock.Number+1 && block.ParentHash == currentBlock.Hash } -// applyBlock: verify and apply a block atomically -// Thread-safe: uses mutex to ensure sequential block application -// verifySig: true for broadcast channel blocks, false for sync channel blocks -func (r *BlockBroadcastReactor) applyBlock(block *BlockV2, verifySig bool) error { +// applyBlock verifies signature, applies the block atomically, and persists the signature. +// Thread-safe: uses mutex to ensure sequential block application. +func (r *BlockBroadcastReactor) applyBlock(block *BlockV2) error { r.applyMtx.Lock() defer r.applyMtx.Unlock() - // Verify signature only for broadcast channel - if verifySig && !r.verifySignature(block) { - return ErrInvalidSignature - } - // Verify parent currentBlock := r.stateV2.LatestBlock() if currentBlock != nil && block.ParentHash != currentBlock.Hash { @@ -412,47 +485,18 @@ func (r *BlockBroadcastReactor) applyBlock(block *BlockV2, verifySig bool) error return err } + // Persist signature for historical block serving + if err := r.sigStore.SaveSignature(block.Hash, block.Signature); err != nil { + panic(fmt.Sprintf("failed to save signature at height %d: %v", block.Number, err)) + } + // Add to recent blocks r.recentBlocks.Add(block) - r.logger.Info("Applied block", "number", block.Number, "verifySig", verifySig) + r.logger.Info("Applied block", "number", block.Number) return nil } -func (r *BlockBroadcastReactor) verifySignature(block *BlockV2) bool { - if len(block.Signature) == 0 { - r.logger.Error("Signature verification failed: empty signature", "block", block.Number) - return false - } - pubKey, err := crypto.SigToPub(block.Hash.Bytes(), block.Signature) - if err != nil { - r.logger.Error("Signature verification failed: SigToPub error", "block", block.Number, "err", err) - return false - } - recoveredAddr := crypto.PubkeyToAddress(*pubKey) - - if r.verifier == nil { - r.logger.Error("Sequencer verifier not set", "block", block.Number) - return false - } - - isSeq, err := r.verifier.IsSequencer(context.Background(), recoveredAddr) - if err != nil { - r.logger.Error("Signature verification failed: verifier error", - "block", block.Number, - "recovered", recoveredAddr.Hex(), - "err", err) - return false - } - if !isSeq { - r.logger.Error("Signature verification failed: not a valid sequencer", - "block", block.Number, - "recovered", recoveredAddr.Hex()) - return false - } - return true -} - // ============================================================================ // Gossip // ============================================================================ @@ -506,6 +550,79 @@ func (r *BlockBroadcastReactor) gossipBlock(block *BlockV2, fromPeer p2p.ID) { } } +// ============================================================================ +// Sync Request Tracking +// ============================================================================ + +// recordSyncRequest records a pending sync request keyed by height. +func (r *BlockBroadcastReactor) recordSyncRequest(peerID p2p.ID, height int64) { + r.syncRequestsMu.Lock() + defer r.syncRequestsMu.Unlock() + if old, ok := r.syncRequests[height]; ok { + r.syncPeerCounts[old.PeerID]-- + } + r.syncRequests[height] = &SyncReq{ + Height: height, + PeerID: peerID, + ExpireAt: time.Now().Add(syncRequestTTL), + } + r.syncPeerCounts[peerID]++ +} + +// checkAndTakeSyncRequest validates a sync response against pending requests. +// Only removes the entry if peerID matches and request is not expired. +func (r *BlockBroadcastReactor) checkAndTakeSyncRequest(peerID p2p.ID, height int64) bool { + r.syncRequestsMu.Lock() + defer r.syncRequestsMu.Unlock() + + req, ok := r.syncRequests[height] + if !ok || req.PeerID != peerID || time.Now().After(req.ExpireAt) { + return false + } + delete(r.syncRequests, height) + r.syncPeerCounts[peerID]-- + return true +} + +// cleanupExpiredRequests removes expired entries. Called before sending new requests. +func (r *BlockBroadcastReactor) cleanupExpiredRequests() { + r.syncRequestsMu.Lock() + defer r.syncRequestsMu.Unlock() + now := time.Now() + for h, req := range r.syncRequests { + if now.After(req.ExpireAt) { + r.syncPeerCounts[req.PeerID]-- + delete(r.syncRequests, h) + } + } +} + +// pendingSyncCount returns the total number of pending sync requests. +func (r *BlockBroadcastReactor) pendingSyncCount() int { + r.syncRequestsMu.Lock() + defer r.syncRequestsMu.Unlock() + return len(r.syncRequests) +} + +// pendingSyncCountByPeer returns the number of pending sync requests for a specific peer. +func (r *BlockBroadcastReactor) pendingSyncCountByPeer(peerID p2p.ID) int { + r.syncRequestsMu.Lock() + defer r.syncRequestsMu.Unlock() + return r.syncPeerCounts[peerID] +} + +// removeSyncRequestsByPeer removes all pending requests for a disconnected peer. +func (r *BlockBroadcastReactor) removeSyncRequestsByPeer(peerID p2p.ID) { + r.syncRequestsMu.Lock() + defer r.syncRequestsMu.Unlock() + for h, req := range r.syncRequests { + if req.PeerID == peerID { + delete(r.syncRequests, h) + } + } + delete(r.syncPeerCounts, peerID) +} + // ============================================================================ // Message Handlers // ============================================================================ @@ -534,6 +651,13 @@ func (r *BlockBroadcastReactor) onBlockRequest(msg *bcproto.BlockRequest, src p2 return } + // Attach signature from store if block doesn't already carry one + if len(block.Signature) == 0 && r.sigStore != nil { + if sig, err := r.sigStore.GetSignature(block.Hash); err == nil { + block.Signature = sig + } + } + resp := &bcproto.BlockResponseV2{ Block: BlockV2ToProto(block), } diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index b720965cdcf..433135a9ece 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -1,7 +1,6 @@ package sequencer import ( - "context" "errors" "github.com/morph-l2/go-ethereum/common" @@ -13,9 +12,15 @@ var ( ErrInvalidSignature = errors.New("invalid block signature") ) -// SequencerVerifier verifies if an address is the current L1 sequencer +// SequencerVerifier verifies if an address is a valid L1 sequencer. type SequencerVerifier interface { - IsSequencer(ctx context.Context, addr common.Address) (bool, error) + // IsSequencerAt checks if addr was the valid sequencer at the given L2 block height. + IsSequencerAt(addr common.Address, l2Height uint64) (bool, error) + + // VerificationStartHeight returns the L2 block height from which V2 signature + // verification is enforced (= upgradeBlockHeight). Blocks below this height are + // PBFT blocks and skip V2 verification. Returns math.MaxUint64 if not configured. + VerificationStartHeight() uint64 } // Signer interface for sequencer block signing @@ -24,6 +29,27 @@ type Signer interface { Sign(data []byte) ([]byte, error) // Address returns the sequencer's address Address() common.Address - // IsActiveSequencer checks if this signer is the current L1 sequencer - IsActiveSequencer(ctx context.Context) (bool, error) +} + +// SequencerHA is the abstraction for Raft HA cluster. +// In single-node mode, ha == nil and all HA-related logic is skipped. +type SequencerHA interface { + // IsLeader returns whether the current node is the Raft leader (sole block producer). + IsLeader() bool + + // Join adds this node to the Raft cluster. + // Precondition: node has synced to near chain tip via Fullnode mode. + // Fails if localHeight < raft.EarliestRetainedLogHeight. + // On success, the node is a full Raft member and P2P sync can be stopped. + Join() error + + // Commit replicates a signed block via Raft to the cluster. + // Blocks until majority of nodes have acknowledged receipt (NOT applied). + // ApplyBlock is handled separately by leader (after Commit) and followers (via Subscribe). + // On failure, the caller should abandon this block production round. + Commit(block *BlockV2) error + + // Subscribe returns a channel that delivers blocks after Raft commit. + // Both leader and follower subscribe; used by broadcastRoutine for P2P broadcast. + Subscribe() <-chan *BlockV2 } diff --git a/sequencer/signature_store.go b/sequencer/signature_store.go new file mode 100644 index 00000000000..b0cca3d652f --- /dev/null +++ b/sequencer/signature_store.go @@ -0,0 +1,51 @@ +package sequencer + +import ( + "fmt" + + "github.com/morph-l2/go-ethereum/common" + dbm "github.com/tendermint/tm-db" +) + +var signaturePrefix = []byte("block_sig:") + +// SignatureStore persists block signatures independently from geth. +// Key = "block_sig:" + blockHash (32 bytes), Value = signature (65 bytes). +type SignatureStore struct { + db dbm.DB +} + +// NewSignatureStore creates a new SignatureStore backed by the given DB. +func NewSignatureStore(db dbm.DB) *SignatureStore { + return &SignatureStore{db: db} +} + +func signatureKey(blockHash common.Hash) []byte { + key := make([]byte, len(signaturePrefix)+common.HashLength) + copy(key, signaturePrefix) + copy(key[len(signaturePrefix):], blockHash.Bytes()) + return key +} + +// SaveSignature persists a block's signature. +func (s *SignatureStore) SaveSignature(blockHash common.Hash, sig []byte) error { + if s == nil || s.db == nil { + return nil + } + return s.db.Set(signatureKey(blockHash), sig) +} + +// GetSignature retrieves a block's signature. Returns nil, error if not found. +func (s *SignatureStore) GetSignature(blockHash common.Hash) ([]byte, error) { + if s == nil || s.db == nil { + return nil, fmt.Errorf("signature store not initialized") + } + val, err := s.db.Get(signatureKey(blockHash)) + if err != nil { + return nil, err + } + if len(val) == 0 { + return nil, fmt.Errorf("signature not found for block %s", blockHash.Hex()) + } + return val, nil +} diff --git a/sequencer/state_v2.go b/sequencer/state_v2.go index 14833973de1..b18fb81096a 100644 --- a/sequencer/state_v2.go +++ b/sequencer/state_v2.go @@ -1,7 +1,6 @@ package sequencer import ( - "context" "fmt" "sync" "time" @@ -19,41 +18,53 @@ const ( // StateV2 manages the state for centralized sequencer mode. // It replaces the PBFT consensus state after the upgrade. +// +// Node roles: +// - Fullnode (signer==nil): only applyRoutine runs (in reactor), no block production +// - ActiveSequencer (signer!=nil, ha==nil): roleCheckRoutine + broadcastRoutine +// - HA-Leader (signer!=nil, ha!=nil, ha.IsLeader()==true): roleCheckRoutine + broadcastRoutine +// - HA-Follower (signer!=nil, ha!=nil, ha.IsLeader()==false): roleCheckRoutine (idle) type StateV2 struct { service.BaseService mtx sync.RWMutex // Core state - latestBlock *BlockV2 - sequencerMode bool // Whether the node is started in sequencer mode (has signer configured) + latestBlock *BlockV2 // Dependencies l2Node l2node.L2Node signer Signer verifier SequencerVerifier + sigStore *SignatureStore + ha SequencerHA // nil = single-node mode logger log.Logger // Block production - blockTicker *time.Ticker blockInterval time.Duration - // Broadcast channel - blocks produced by this sequencer are sent here + // Broadcast channel - non-HA self-produced blocks are sent here broadcastCh chan *BlockV2 - // Quit channel + // Lifecycle quitCh chan struct{} } // NewStateV2 creates a new StateV2 instance. -// sequencerMode is determined by whether a signer is provided. +// Node mode is determined by whether a signer and/or ha is provided. +// verifier is required when signer is configured (sequencer/HA nodes must verify blocks). func NewStateV2( l2Node l2node.L2Node, blockInterval time.Duration, logger log.Logger, verifier SequencerVerifier, signer Signer, + sigStore *SignatureStore, + ha SequencerHA, ) (*StateV2, error) { + if verifier == nil { + return nil, fmt.Errorf("sequencer verifier is required for V2 mode") + } if blockInterval <= 0 { blockInterval = DefaultBlockInterval } @@ -62,7 +73,8 @@ func NewStateV2( l2Node: l2Node, signer: signer, verifier: verifier, - sequencerMode: signer != nil, + sigStore: sigStore, + ha: ha, blockInterval: blockInterval, logger: logger.With("module", "stateV2"), broadcastCh: make(chan *BlockV2, 100), @@ -75,9 +87,8 @@ func NewStateV2( } // OnStart implements service.Service. -// It initializes state from geth and starts block production if this node is the active sequencer. +// Initializes state from geth. Nodes with a signer start roleCheckRoutine. func (s *StateV2) OnStart() error { - // Initialize latest block from geth latestBlock, err := s.l2Node.GetLatestBlockV2() if err != nil { return fmt.Errorf("failed to get latest block: %w", err) @@ -87,28 +98,17 @@ func (s *StateV2) OnStart() error { s.latestBlock = latestBlock s.mtx.Unlock() - var seqAddr string - var isActiveSequencer bool - if s.signer != nil { - seqAddr = s.signer.Address().Hex() - // Check if this node is the active sequencer via L1 contract - isActiveSequencer, err = s.signer.IsActiveSequencer(context.Background()) - if err != nil { - s.logger.Error("Failed to check sequencer status", "error", err) - isActiveSequencer = false - } - } - + // Use local variable to avoid accessing s.latestBlock without lock in log statement. s.logger.Info("StateV2 initialized", - "latestHeight", s.latestBlock.Number, - "latestHash", s.latestBlock.Hash.Hex(), - "sequencerMode", s.sequencerMode, - "isActiveSequencer", isActiveSequencer, - "seqAddr", seqAddr) - - // Start block production if sequencer mode is enabled and this node is the active sequencer - if s.sequencerMode && isActiveSequencer { - go s.produceBlockRoutine() + "latestHeight", latestBlock.Number, + "latestHash", latestBlock.Hash.Hex(), + "hasSigner", s.signer != nil, + "isHAMode", s.ha != nil) + + // Fullnode (no signer) does not produce blocks; applyRoutine is managed by the reactor. + // Nodes with signer start roleCheckRoutine, which handles dynamic role detection. + if s.signer != nil { + go s.roleCheckRoutine() } return nil @@ -118,71 +118,113 @@ func (s *StateV2) OnStart() error { func (s *StateV2) OnStop() { s.logger.Info("Stopping StateV2") close(s.quitCh) - if s.blockTicker != nil { - s.blockTicker.Stop() - } } -// produceBlockRoutine is the main loop for block production. -func (s *StateV2) produceBlockRoutine() { - s.blockTicker = time.NewTicker(s.blockInterval) - defer s.blockTicker.Stop() +// roleCheckRoutine is the unified loop for role detection and block production. +// It runs for all nodes with a signer (ActiveSequencer, HA-Leader, HA-Follower). +// On each tick it checks isActiveSequencer(): if true, produces a block; otherwise idles. +// This enables bidirectional role transitions without restarting the service. +func (s *StateV2) roleCheckRoutine() { + ticker := time.NewTicker(s.blockInterval) + defer ticker.Stop() - s.logger.Info("Starting block production routine", "interval", s.blockInterval) + s.logger.Info("Starting role check routine", "interval", s.blockInterval) for { select { case <-s.quitCh: - s.logger.Info("Block production routine stopped") + s.logger.Info("Role check routine stopped") return - case <-s.blockTicker.C: + case <-ticker.C: + if !s.isActiveSequencer() { + continue + } s.produceBlock() } } } -// produceBlock produces a new block and broadcasts it. +// isActiveSequencer returns true if this node should produce the next block. +// For HA mode: must be Raft leader AND L1-designated sequencer. +// For single-node mode: must be L1-designated sequencer. +func (s *StateV2) isActiveSequencer() bool { + // HA mode: must be Raft leader + if s.ha != nil && !s.ha.IsLeader() { + return false + } + + s.mtx.RLock() + lb := s.latestBlock + s.mtx.RUnlock() + if lb == nil { + return false + } + nextHeight := lb.Number + 1 + + ok, err := s.verifier.IsSequencerAt(s.signer.Address(), nextHeight) + if err != nil { + s.logger.Error("Failed to check sequencer status", "height", nextHeight, "err", err) + return false + } + return ok +} + +// produceBlock produces a new block, signs it, and either commits via Raft (HA) +// or applies locally and sends to broadcastCh (single-node). func (s *StateV2) produceBlock() { - s.mtx.Lock() + s.mtx.RLock() parentHash := s.latestBlock.Hash - s.mtx.Unlock() + s.mtx.RUnlock() s.logger.Debug("Producing block", "parentHash", parentHash.Hex()) - // Request block data from geth (pass hash as bytes) block, collectedL1Msgs, err := s.l2Node.RequestBlockDataV2(parentHash.Bytes()) if err != nil { s.logger.Error("Failed to request block data", "error", err) return } - _ = collectedL1Msgs // TODO: log or use this info + _ = collectedL1Msgs - // Sign the block if err := s.signBlock(block); err != nil { s.logger.Error("Failed to sign block", "error", err) return } - // ********************* RAFT HA ********************* - // TODO: add raft HA - // **************************************************** + if err := s.sigStore.SaveSignature(block.Hash, block.Signature); err != nil { + panic(fmt.Sprintf("failed to save signature at height %d: %v", block.Number, err)) + } + + // HA mode: replicate via Raft consensus (data replication + majority ACK). + // ha.Commit() only ensures majority of cluster nodes have received the block data. + // Leader applies locally after Commit. Followers apply via Raft FSM internally. + // Broadcast to P2P happens via ha.Subscribe() -> broadcastRoutine on all HA nodes. + if s.ha != nil { + if err := s.ha.Commit(block); err != nil { + s.logger.Error("Failed to commit block via HA", "number", block.Number, "err", err) + return + } + s.logger.Debug("Block committed via HA", "number", block.Number, "hash", block.Hash.Hex()) + } - // Apply the block to geth and update local state + // Apply locally (HA leader + non-HA both apply here) if err := s.ApplyBlock(block); err != nil { s.logger.Error("Failed to apply block", "error", err) return } - // Send to broadcast channel - select { - case s.broadcastCh <- block: - s.logger.Debug("Block produced and queued for broadcast", - "number", block.Number, - "hash", block.Hash.Hex(), - "txCount", len(block.Transactions), - "collectedL1Msgs", collectedL1Msgs) - default: - s.logger.Error("Broadcast channel full, dropping block", "number", block.Number) + // Non-HA: broadcast via broadcastCh -> broadcastRoutine + // HA: broadcast via ha.Subscribe() -> broadcastRoutine (don't write broadcastCh) + if s.ha == nil { + select { + case s.broadcastCh <- block: + s.logger.Debug("Block produced and queued for broadcast", + "number", block.Number, + "hash", block.Hash.Hex(), + "txCount", len(block.Transactions), + "collectedL1Msgs", collectedL1Msgs) + default: + s.logger.Error("Broadcast channel full, dropping block", "number", block.Number) + } } } @@ -191,19 +233,33 @@ func (s *StateV2) signBlock(block *BlockV2) error { if s.signer == nil { return fmt.Errorf("signer not set") } - - // Sign the block hash signature, err := s.signer.Sign(block.Hash.Bytes()) if err != nil { return fmt.Errorf("failed to sign block: %w", err) } - block.Signature = signature - s.logger.Debug("Block signed", "number", block.Number, "hash", block.Hash.Hex(), "signer", s.signer.Address().Hex()) return nil } +// ApplyBlock applies a block to L2 and updates local state. +// Serialized by mutex to prevent concurrent application. +// Idempotent: silently skips blocks already applied. +func (s *StateV2) ApplyBlock(block *BlockV2) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + if s.latestBlock != nil && block.Number <= s.latestBlock.Number { + return nil // idempotent: already applied or older block + } + + if err := s.l2Node.ApplyBlockV2(block); err != nil { + return err + } + s.latestBlock = block + return nil +} + // LatestHeight returns the latest block height. func (s *StateV2) LatestHeight() int64 { s.mtx.RLock() @@ -221,36 +277,44 @@ func (s *StateV2) LatestBlock() *BlockV2 { return s.latestBlock } -// BroadcastCh returns the channel for blocks to be broadcast. -// No lock needed - channel itself is thread-safe. +// BroadcastCh returns the channel for self-produced blocks (non-HA mode only). func (s *StateV2) BroadcastCh() <-chan *BlockV2 { return s.broadcastCh } -// ApplyBlock applies a block to L2 and updates local state. -// This is the unified entry point for block application. -func (s *StateV2) ApplyBlock(block *BlockV2) error { - // Apply to L2 execution layer - if err := s.l2Node.ApplyBlockV2(block); err != nil { - return err - } +// GetBlockByNumber gets a block from l2node by number. +func (s *StateV2) GetBlockByNumber(number uint64) (*BlockV2, error) { + return s.l2Node.GetBlockByNumber(number) +} - // Update local state - s.mtx.Lock() - s.latestBlock = block - s.mtx.Unlock() +// HasSigner returns whether this node has a signer configured. +// Fullnode returns false; ActiveSequencer and HA nodes return true. +func (s *StateV2) HasSigner() bool { + return s.signer != nil +} - return nil +// IsHAMode returns whether this node is in HA mode (ha != nil). +func (s *StateV2) IsHAMode() bool { + return s.ha != nil } -// GetBlockByNumber gets a block from l2node by number. -// Uses geth's eth_getBlockByNumber RPC internally. -func (s *StateV2) GetBlockByNumber(number uint64) (*BlockV2, error) { - return s.l2Node.GetBlockByNumber(number) +// IsHALeader returns whether this node is the current Raft leader. +// Returns false if not in HA mode. +func (s *StateV2) IsHALeader() bool { + return s.ha != nil && s.ha.IsLeader() +} + +// HASubscribe returns the HA block delivery channel. +// Panics if not in HA mode. +func (s *StateV2) HASubscribe() <-chan *BlockV2 { + if s.ha == nil { + panic("HASubscribe called but not in HA mode") + } + return s.ha.Subscribe() } -// IsSequencerMode returns whether this node is started in sequencer mode. -// This means the node has a signer configured and can potentially produce blocks. +// IsSequencerMode returns whether this node has a signer configured. +// Deprecated: use HasSigner() instead. TODO: remove after all callers are updated. func (s *StateV2) IsSequencerMode() bool { - return s.sequencerMode + return s.signer != nil } diff --git a/sequencer/state_v2_test.go b/sequencer/state_v2_test.go index d33f25293fb..b7dcc136bcf 100644 --- a/sequencer/state_v2_test.go +++ b/sequencer/state_v2_test.go @@ -1,7 +1,7 @@ package sequencer import ( - "context" + "errors" "testing" "time" @@ -11,11 +11,14 @@ import ( "github.com/tendermint/tendermint/types" ) -// mockSignerImpl is a mock implementation of Signer for testing +// ============================================================================ +// Mock implementations +// ============================================================================ + +// mockSignerImpl is a mock implementation of Signer for testing. type mockSignerImpl struct { address common.Address signature []byte - isActive bool } func (m *mockSignerImpl) Sign(data []byte) ([]byte, error) { @@ -29,24 +32,60 @@ func (m *mockSignerImpl) Address() common.Address { return m.address } -func (m *mockSignerImpl) IsActiveSequencer(ctx context.Context) (bool, error) { - return m.isActive, nil +// mockSequencerVerifier is a mock implementation of SequencerVerifier for testing. +type mockSequencerVerifier struct { + isSequencer bool + startHeight uint64 + err error +} + +func (m *mockSequencerVerifier) IsSequencerAt(addr common.Address, l2Height uint64) (bool, error) { + if m.err != nil { + return false, m.err + } + return m.isSequencer, nil +} + +func (m *mockSequencerVerifier) VerificationStartHeight() uint64 { + return m.startHeight +} + +// mockSequencerHA is a mock implementation of SequencerHA for testing. +type mockSequencerHA struct { + leader bool + commitErr error + subCh chan *BlockV2 +} + +func newMockSequencerHA(leader bool) *mockSequencerHA { + return &mockSequencerHA{ + leader: leader, + subCh: make(chan *BlockV2, 10), + } } -// newTestMockL2Node creates a mock L2Node for testing +func (m *mockSequencerHA) IsLeader() bool { return m.leader } +func (m *mockSequencerHA) Join() error { return nil } +func (m *mockSequencerHA) Commit(block *BlockV2) error { return m.commitErr } +func (m *mockSequencerHA) Subscribe() <-chan *BlockV2 { return m.subCh } + +// newTestMockL2Node creates a mock L2Node for testing. func newTestMockL2Node() l2node.L2Node { return l2node.NewMockL2Node(0, "") } +// ============================================================================ +// Existing tests (adapted for new signature) +// ============================================================================ + func TestStateV2_NewStateV2(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - stateV2, err := NewStateV2(mockL2Node, time.Second, logger, nil, nil) + stateV2, err := NewStateV2(mockL2Node, time.Second, logger, &mockSequencerVerifier{}, nil, nil, nil) if err != nil { t.Fatalf("NewStateV2 failed: %v", err) } - if stateV2 == nil { t.Fatal("StateV2 should not be nil") } @@ -56,41 +95,32 @@ func TestStateV2_LatestHeight(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - stateV2, err := NewStateV2(mockL2Node, time.Second, logger, nil, nil) + stateV2, err := NewStateV2(mockL2Node, time.Second, logger, &mockSequencerVerifier{}, nil, nil, nil) if err != nil { t.Fatalf("NewStateV2 failed: %v", err) } - // Before start, latestBlock should be nil height := stateV2.LatestHeight() if height != 0 { t.Errorf("LatestHeight before start = %d, want 0", height) } } -func TestStateV2_IsSequencerMode(t *testing.T) { +func TestStateV2_HasSigner_MatchesIsSequencerMode(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() + mockVerifier := &mockSequencerVerifier{} - // Without signer, sequencerMode should be false - stateV2, err := NewStateV2(mockL2Node, time.Second, logger, nil, nil) - if err != nil { - t.Fatalf("NewStateV2 failed: %v", err) + // Without signer + s1, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, nil, nil, nil) + if s1.HasSigner() || s1.IsSequencerMode() { + t.Error("should be false when signer is nil") } - if stateV2.IsSequencerMode() { - t.Error("IsSequencerMode should be false when signer is nil") - } - - // With mock signer, sequencerMode should be true - mockSigner := &mockSignerImpl{} - stateV2WithSigner, err := NewStateV2(mockL2Node, time.Second, logger, nil, mockSigner) - if err != nil { - t.Fatalf("NewStateV2 failed: %v", err) - } - - if !stateV2WithSigner.IsSequencerMode() { - t.Error("IsSequencerMode should be true when signer is provided") + // With signer + s2, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, &mockSignerImpl{}, nil, nil) + if !s2.HasSigner() || !s2.IsSequencerMode() { + t.Error("should be true when signer is provided") } } @@ -98,33 +128,22 @@ func TestStateV2_SignBlock(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - // Create mock signer - mockSigner := &mockSignerImpl{ - signature: make([]byte, 65), // Mock 65-byte signature - } + mockSigner := &mockSignerImpl{signature: make([]byte, 65)} + mockVerifier := &mockSequencerVerifier{} - stateV2, err := NewStateV2(mockL2Node, time.Second, logger, nil, mockSigner) + stateV2, err := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, nil) if err != nil { t.Fatalf("NewStateV2 failed: %v", err) } - // Create a test block block := &types.BlockV2{ Number: 1, Hash: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}, } - // Sign the block - err = stateV2.signBlock(block) - if err != nil { + if err := stateV2.signBlock(block); err != nil { t.Fatalf("signBlock failed: %v", err) } - - if len(block.Signature) == 0 { - t.Error("Block signature should not be empty") - } - - // Verify signature length (Ethereum signatures are 65 bytes) if len(block.Signature) != 65 { t.Errorf("Signature length = %d, want 65", len(block.Signature)) } @@ -134,33 +153,254 @@ func TestStateV2_SignBlockWithoutSigner(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() - // Create StateV2 without signer - stateV2, err := NewStateV2(mockL2Node, time.Second, logger, nil, nil) + stateV2, err := NewStateV2(mockL2Node, time.Second, logger, &mockSequencerVerifier{}, nil, nil, nil) if err != nil { t.Fatalf("NewStateV2 failed: %v", err) } - block := &types.BlockV2{ - Number: 1, - Hash: [32]byte{1, 2, 3, 4}, + block := &types.BlockV2{Number: 1, Hash: [32]byte{1, 2, 3, 4}} + if err := stateV2.signBlock(block); err == nil { + t.Error("signBlock should fail without signer") } +} + +// ============================================================================ +// New tests: verifier requirement +// ============================================================================ - // Sign should fail without signer - err = stateV2.signBlock(block) +func TestNewStateV2_VerifierRequired(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + + // verifier==nil should fail + _, err := NewStateV2(mockL2Node, time.Second, logger, nil, nil, nil, nil) if err == nil { - t.Error("signBlock should fail without signer") + t.Fatal("expected error when verifier is nil") + } + + // with signer, still fails without verifier + mockSigner := &mockSignerImpl{} + _, err = NewStateV2(mockL2Node, time.Second, logger, nil, mockSigner, nil, nil) + if err == nil { + t.Fatal("expected error when verifier is nil (with signer)") } } -// Helper to create a test StateV2 -func createTestStateV2(t *testing.T, signer Signer) *StateV2 { +func TestNewStateV2_WithHA(t *testing.T) { mockL2Node := newTestMockL2Node() logger := log.NewNopLogger() + mockSigner := &mockSignerImpl{} + mockVerifier := &mockSequencerVerifier{} + ha := newMockSequencerHA(true) - stateV2, err := NewStateV2(mockL2Node, time.Second, logger, nil, signer) + stateV2, err := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, ha) if err != nil { t.Fatalf("NewStateV2 failed: %v", err) } + if !stateV2.IsHAMode() { + t.Error("IsHAMode should be true when ha is provided") + } +} + +// ============================================================================ +// New tests: node mode helpers +// ============================================================================ + +func TestStateV2_HasSigner(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + + fullnode, _ := NewStateV2(mockL2Node, time.Second, logger, &mockSequencerVerifier{}, nil, nil, nil) + if fullnode.HasSigner() { + t.Error("Fullnode should not have signer") + } + + mockSigner := &mockSignerImpl{} + mockVerifier := &mockSequencerVerifier{} + seqNode, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, nil) + if !seqNode.HasSigner() { + t.Error("Sequencer node should have signer") + } +} + +func TestStateV2_IsHAMode(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + mockSigner := &mockSignerImpl{} + mockVerifier := &mockSequencerVerifier{} + + // Non-HA + nonHA, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, nil) + if nonHA.IsHAMode() { + t.Error("IsHAMode should be false without ha") + } + + // HA + ha, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, newMockSequencerHA(false)) + if !ha.IsHAMode() { + t.Error("IsHAMode should be true with ha") + } +} + +func TestStateV2_IsHALeader(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + mockSigner := &mockSignerImpl{} + mockVerifier := &mockSequencerVerifier{} + + // Non-HA: never leader + nonHA, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, nil) + if nonHA.IsHALeader() { + t.Error("non-HA node should not be HA leader") + } - return stateV2 + // HA follower + follower, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, newMockSequencerHA(false)) + if follower.IsHALeader() { + t.Error("HA follower should not be leader") + } + + // HA leader + leader, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, newMockSequencerHA(true)) + if !leader.IsHALeader() { + t.Error("HA leader should be leader") + } +} + +// ============================================================================ +// New tests: isActiveSequencer +// ============================================================================ + +func TestStateV2_IsActiveSequencer_NonHA_Active(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + mockSigner := &mockSignerImpl{address: common.HexToAddress("0x1")} + mockVerifier := &mockSequencerVerifier{isSequencer: true} + + s, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, nil) + s.latestBlock = &BlockV2{Number: 0} + + if !s.isActiveSequencer() { + t.Error("should be active sequencer when verifier returns true") + } +} + +func TestStateV2_IsActiveSequencer_NonHA_Inactive(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + mockSigner := &mockSignerImpl{address: common.HexToAddress("0x1")} + mockVerifier := &mockSequencerVerifier{isSequencer: false} + + s, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, nil) + s.latestBlock = &BlockV2{Number: 0} + + if s.isActiveSequencer() { + t.Error("should not be active sequencer when verifier returns false") + } +} + +func TestStateV2_IsActiveSequencer_HA_Leader(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + mockSigner := &mockSignerImpl{address: common.HexToAddress("0x1")} + mockVerifier := &mockSequencerVerifier{isSequencer: true} + ha := newMockSequencerHA(true) + + s, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, ha) + s.latestBlock = &BlockV2{Number: 0} + + if !s.isActiveSequencer() { + t.Error("HA leader should be active sequencer") + } +} + +func TestStateV2_IsActiveSequencer_HA_Follower(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + mockSigner := &mockSignerImpl{address: common.HexToAddress("0x1")} + mockVerifier := &mockSequencerVerifier{isSequencer: true} // L1 says active + ha := newMockSequencerHA(false) // but not leader + + s, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, ha) + s.latestBlock = &BlockV2{Number: 0} + + if s.isActiveSequencer() { + t.Error("HA follower should not be active sequencer even if L1 says active") + } +} + +func TestStateV2_IsActiveSequencer_VerifierError(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + mockSigner := &mockSignerImpl{} + mockVerifier := &mockSequencerVerifier{err: errors.New("rpc error")} + + s, _ := NewStateV2(mockL2Node, time.Second, logger, mockVerifier, mockSigner, nil, nil) + s.latestBlock = &BlockV2{Number: 0} + + if s.isActiveSequencer() { + t.Error("should return false when verifier returns error") + } +} + +// ============================================================================ +// New tests: ApplyBlock +// ============================================================================ + +func TestStateV2_ApplyBlock_Idempotent(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + + s, _ := NewStateV2(mockL2Node, time.Second, logger, &mockSequencerVerifier{}, nil, nil, nil) + block := &types.BlockV2{Number: 1} + + // Apply twice should not error + if err := s.ApplyBlock(block); err != nil { + t.Fatalf("first apply failed: %v", err) + } + if err := s.ApplyBlock(block); err != nil { + t.Fatalf("second apply (idempotent) failed: %v", err) + } + if s.LatestHeight() != 1 { + t.Errorf("LatestHeight = %d, want 1", s.LatestHeight()) + } +} + +func TestStateV2_ApplyBlock_OlderBlockSkipped(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + + s, _ := NewStateV2(mockL2Node, time.Second, logger, &mockSequencerVerifier{}, nil, nil, nil) + + block2 := &types.BlockV2{Number: 2} + block1 := &types.BlockV2{Number: 1} + + if err := s.ApplyBlock(block2); err != nil { + t.Fatalf("apply block2 failed: %v", err) + } + // Apply older block should be skipped silently + if err := s.ApplyBlock(block1); err != nil { + t.Fatalf("apply older block should not error: %v", err) + } + // latestBlock should still be 2 + if s.LatestHeight() != 2 { + t.Errorf("LatestHeight = %d, want 2", s.LatestHeight()) + } +} + +func TestStateV2_ApplyBlock_Sequential(t *testing.T) { + mockL2Node := newTestMockL2Node() + logger := log.NewNopLogger() + + s, _ := NewStateV2(mockL2Node, time.Second, logger, &mockSequencerVerifier{}, nil, nil, nil) + + for i := uint64(1); i <= 5; i++ { + block := &types.BlockV2{Number: i} + if err := s.ApplyBlock(block); err != nil { + t.Fatalf("apply block %d failed: %v", i, err) + } + } + if s.LatestHeight() != 5 { + t.Errorf("LatestHeight = %d, want 5", s.LatestHeight()) + } } From df1227163c2aa94a8f7d8f5b70435a9b41fa7d47 Mon Sep 17 00:00:00 2001 From: "allen.wu" Date: Fri, 27 Mar 2026 15:41:26 +0800 Subject: [PATCH 2/2] refactor: remove dead code and duplicate proto conversion - Remove SequencerAddress, SetSequencerAddress, IsSequencerAddress, RecoverBlockV2Signer from types/block_v2.go (unused since SequencerVerifier) - Remove duplicate BlockV2ToProto/ProtoToBlockV2 from broadcast_reactor.go, use types.BlockV2ToProto instead - Clean up unused imports (fmt, crypto, math/big, seqproto) Co-Authored-By: Claude Opus 4.6 (1M context) --- sequencer/broadcast_reactor.go | 61 +++------------------------------- types/block_v2.go | 31 ----------------- 2 files changed, 4 insertions(+), 88 deletions(-) diff --git a/sequencer/broadcast_reactor.go b/sequencer/broadcast_reactor.go index b75174fd154..1db8f69b28d 100644 --- a/sequencer/broadcast_reactor.go +++ b/sequencer/broadcast_reactor.go @@ -2,7 +2,6 @@ package sequencer import ( "fmt" - "math/big" "math/rand" "reflect" "sync" @@ -14,7 +13,6 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync" - seqproto "github.com/tendermint/tendermint/proto/tendermint/sequencer" "github.com/tendermint/tendermint/types" ) @@ -446,7 +444,7 @@ func (r *BlockBroadcastReactor) findPeerWithHeight(peers []p2p.Peer, height int6 return nil } - start := rand.Intn(n) + start := rand.Intn(n) //nolint:gosec // non-security: peer selection randomization for i := 0; i < n; i++ { peer := peers[(start+i)%n] if r.pool.GetPeerHeight(peer.ID()) >= height && @@ -521,7 +519,7 @@ func (r *BlockBroadcastReactor) hasSentToPeer(peerID p2p.ID, hash common.Hash) b func (r *BlockBroadcastReactor) gossipBlock(block *BlockV2, fromPeer p2p.ID) { r.logger.Info("Gossiping block", "number", block.Number, "hash", block.Hash.Hex(), "fromPeer", fromPeer) msg := &bcproto.BlockResponseV2{ - Block: BlockV2ToProto(block), + Block: types.BlockV2ToProto(block), } bz, err := encodeMsg(msg) if err != nil { @@ -659,7 +657,7 @@ func (r *BlockBroadcastReactor) onBlockRequest(msg *bcproto.BlockRequest, src p2 } resp := &bcproto.BlockResponseV2{ - Block: BlockV2ToProto(block), + Block: types.BlockV2ToProto(block), } bz, err := encodeMsg(resp) if err != nil { @@ -672,7 +670,7 @@ func (r *BlockBroadcastReactor) onBlockRequest(msg *bcproto.BlockRequest, src p2 // broadcast only for sequencer func (r *BlockBroadcastReactor) broadcast(block *BlockV2) { resp := &bcproto.BlockResponseV2{ - Block: BlockV2ToProto(block), + Block: types.BlockV2ToProto(block), } bz, err := encodeMsg(resp) if err != nil { @@ -683,57 +681,6 @@ func (r *BlockBroadcastReactor) broadcast(block *BlockV2) { r.logger.Info("Broadcast block", "number", block.Number, "hash", block.Hash.Hex()) } -// ============================================================================ -// Proto Conversion -// ============================================================================ - -func BlockV2ToProto(block *BlockV2) *seqproto.BlockV2 { - var baseFee []byte - if block.BaseFee != nil { - baseFee = block.BaseFee.Bytes() - } - return &seqproto.BlockV2{ - ParentHash: block.ParentHash.Bytes(), - Miner: block.Miner.Bytes(), - Number: block.Number, - GasLimit: block.GasLimit, - BaseFee: baseFee, - Timestamp: block.Timestamp, - Transactions: block.Transactions, - StateRoot: block.StateRoot.Bytes(), - GasUsed: block.GasUsed, - ReceiptRoot: block.ReceiptRoot.Bytes(), - LogsBloom: block.LogsBloom, - WithdrawTrieRoot: block.WithdrawTrieRoot.Bytes(), - NextL1MessageIndex: block.NextL1MessageIndex, - Hash: block.Hash.Bytes(), - Signature: block.Signature, - } -} - -func ProtoToBlockV2(pb *seqproto.BlockV2) *BlockV2 { - baseFee := new(big.Int) - if len(pb.BaseFee) > 0 { - baseFee.SetBytes(pb.BaseFee) - } - return &BlockV2{ - ParentHash: common.BytesToHash(pb.ParentHash), - Miner: common.BytesToAddress(pb.Miner), - Number: pb.Number, - GasLimit: pb.GasLimit, - BaseFee: baseFee, - Timestamp: pb.Timestamp, - Transactions: pb.Transactions, - StateRoot: common.BytesToHash(pb.StateRoot), - GasUsed: pb.GasUsed, - ReceiptRoot: common.BytesToHash(pb.ReceiptRoot), - LogsBloom: pb.LogsBloom, - WithdrawTrieRoot: common.BytesToHash(pb.WithdrawTrieRoot), - NextL1MessageIndex: pb.NextL1MessageIndex, - Hash: common.BytesToHash(pb.Hash), - Signature: pb.Signature, - } -} // ==================== Message Encoding/Decoding ==================== // Local copies to avoid import cycle with blocksync package diff --git a/types/block_v2.go b/types/block_v2.go index 58e65a8d3d0..ce447a3b3be 100644 --- a/types/block_v2.go +++ b/types/block_v2.go @@ -2,11 +2,9 @@ package types import ( "errors" - "fmt" "math/big" "github.com/morph-l2/go-ethereum/common" - "github.com/morph-l2/go-ethereum/crypto" seqproto "github.com/tendermint/tendermint/proto/tendermint/sequencer" ) @@ -61,35 +59,6 @@ type SyncableBlock interface { // Ensure BlockV2 implements SyncableBlock var _ SyncableBlock = (*BlockV2)(nil) -// SequencerAddress is the expected sequencer address for signature verification. -// This will be set by the sequencer package at init time. -var SequencerAddress common.Address - -// SetSequencerAddress sets the expected sequencer address. -func SetSequencerAddress(addr common.Address) { - SequencerAddress = addr -} - -// IsSequencerAddress checks if the given address is the expected sequencer. -func IsSequencerAddress(addr common.Address) bool { - return addr == SequencerAddress -} - -// RecoverBlockV2Signer recovers the signer address from the block's signature. -func RecoverBlockV2Signer(block *BlockV2) (common.Address, error) { - if len(block.Signature) == 0 { - return common.Address{}, fmt.Errorf("block has no signature") - } - - // Recover the public key from the signature - pubKey, err := crypto.SigToPub(block.Hash.Bytes(), block.Signature) - if err != nil { - return common.Address{}, fmt.Errorf("failed to recover public key: %w", err) - } - - return crypto.PubkeyToAddress(*pubKey), nil -} - // BlockV2FromProto converts a proto BlockV2 to types.BlockV2. func BlockV2FromProto(pb *seqproto.BlockV2) (*BlockV2, error) { if pb == nil {