Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 18 additions & 27 deletions consensus/wbft/backend/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ import (
"github.com/ethereum/go-ethereum/triedb"
)

var blockEnqueueChannel chan *types.Block

type fakeBroadcaster struct {
blockFetcher *fetcher.BlockFetcher
blockFetcher *fetcher.BlockFetcher
blockEnqueueChannel chan *types.Block
}

type otherNode struct {
Expand All @@ -65,7 +64,6 @@ type otherNode struct {
}

func makeFakeBroadcaster(chain *core.BlockChain) *fakeBroadcaster {
blockEnqueueChannel = make(chan *types.Block)
validator := func(header *types.Header) error {
return chain.Engine().VerifyHeader(chain, header)
}
Expand All @@ -76,26 +74,16 @@ func makeFakeBroadcaster(chain *core.BlockChain) *fakeBroadcaster {
return chain.CurrentBlock().Number.Uint64()
}

//inserter := func(blocks types.Blocks) (int, error) {
// idx, err := chain.InsertChain(blocks)
// if err == nil {
// header := blocks[len(blocks)-1].Header()
// chain.SetFinalized(header)
// chain.SetSafe(header)
// }
// // ## Wemix END
// return idx, err
//}

fb := fakeBroadcaster{
blockFetcher: fetcher.NewBlockFetcher(false, nil, chain.GetBlockByHash, validator, broadcastBlock, heighter, nil, nil, nil),
blockFetcher: fetcher.NewBlockFetcher(false, nil, chain.GetBlockByHash, validator, broadcastBlock, heighter, nil, nil, nil),
blockEnqueueChannel: make(chan *types.Block),
}
fb.blockFetcher.Start()
return &fb
}

func (fb *fakeBroadcaster) Enqueue(id string, block *types.Block) {
go func() { blockEnqueueChannel <- block }()
go func() { fb.blockEnqueueChannel <- block }()
}

func (fb *fakeBroadcaster) FindPeers(targets map[common.Address]bool) map[common.Address]consensus.Peer {
Expand Down Expand Up @@ -211,17 +199,20 @@ func makeBlock(chain *core.BlockChain, engine *Backend, parent *types.Block) *ty

// makeBlock create block executing no txs without seal
func makeBlockWithoutSeal(chain *core.BlockChain, engine *Backend, parent *types.Block) *types.Block {
engine.NewChainHead() // progress to next sequence
if engine.core.GetState() != wbftcore.StateAcceptRequest {
ticker := time.NewTicker(100 * time.Millisecond)
for {
<-ticker.C
if engine.core.GetState() == wbftcore.StateAcceptRequest {
ticker.Stop()
break
}
}
sub := engine.EventMux().Subscribe(wbft.FinalCommittedEvent{})
defer sub.Unsubscribe()

if err := engine.NewChainHead(); err != nil {
panic("NewChainHead failed: " + err.Error())
}

select {
case <-sub.Chan():
break
case <-time.After(10 * time.Second):
panic("timeout waiting for FinalCommittedEvent")
}

header := makeHeader(chain.Config(), engine.config, parent)
engine.Prepare(chain, header)
block := types.NewBlock(header, nil, nil, nil, trie.NewStackTrie(nil))
Expand Down
47 changes: 35 additions & 12 deletions consensus/wbft/backend/multiengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math/big"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -55,9 +56,9 @@ type testEnv struct {
// properties for test scenario
down map[common.Address]bool
msgDisabled map[common.Address]map[uint64]bool
mu sync.RWMutex
}

// make n engines with given genesis and cfg
func MakeMultiEngineTestEnv(n int) (env *testEnv) {
env = &testEnv{}

Expand All @@ -68,20 +69,23 @@ func MakeMultiEngineTestEnv(n int) (env *testEnv) {
env.msgDisabled = make(map[common.Address]map[uint64]bool)
env.results = make(map[common.Address]chan *types.Block)

config := new(wbft.Config)
wbft.SetConfigFromChainConfig(config, genesis.Config)
config.BlockPeriod = 1
config.Epoch = 100
config.RequestTimeout = 2000
config.MaxRequestTimeoutSeconds = 2
config.AllowedFutureBlockTime = 100000000 // to skip future block check; this makes block creation time to be very short

env.addrs = make([]common.Address, n)
env.index = make(map[common.Address]int)
env.chains = make(map[common.Address]*core.BlockChain)
env.engines = make(map[common.Address]*Backend)
env.subResult = make(chan *types.Block)
for i, nodeKey := range nodeKeys {
// Each engine must have its own Config (and thus its own ProposerPolicy);
// otherwise Start() -> startWBFT() -> ProposerPolicy.Use() causes a data race
// when multiple engines are started concurrently (go engine.Start(...)).
config := new(wbft.Config)
wbft.SetConfigFromChainConfig(config, genesis.Config)
config.BlockPeriod = 1
config.Epoch = 100
config.RequestTimeout = 2000
config.MaxRequestTimeoutSeconds = 2
config.AllowedFutureBlockTime = 100000000 // to skip future block check; this makes block creation time to be very short

addr := crypto.PubkeyToAddress(nodeKey.PublicKey)
memDB := rawdb.NewMemoryDatabase()
env.addrs[i] = addr
Expand Down Expand Up @@ -251,6 +255,8 @@ func (env *testEnv) makeScenarioEngineDown(index ...int) *scenario {
return &scenario{
target: append([]int{}, index...),
set: func(i int) string {
env.mu.Lock()
defer env.mu.Unlock()
env.down[env.addrs[i]] = true
return fmt.Sprintf("[%d:down]", i)
},
Expand All @@ -261,6 +267,8 @@ func (env *testEnv) makeScenarioEngineUp(index ...int) *scenario {
return &scenario{
target: append([]int{}, index...),
set: func(i int) string {
env.mu.Lock()
defer env.mu.Unlock()
env.down[env.addrs[i]] = false
return fmt.Sprintf("[%d:up]", i)
},
Expand All @@ -271,6 +279,8 @@ func (env *testEnv) makeScenarioDisableCommitMsg(index ...int) *scenario {
return &scenario{
target: append([]int{}, index...),
set: func(i int) string {
env.mu.Lock()
defer env.mu.Unlock()
if env.msgDisabled[env.addrs[i]] == nil {
env.msgDisabled[env.addrs[i]] = make(map[uint64]bool)
}
Expand All @@ -284,6 +294,8 @@ func (env *testEnv) makeScenarioEnableCommitMsg(index ...int) *scenario {
return &scenario{
target: append([]int{}, index...),
set: func(i int) string {
env.mu.Lock()
defer env.mu.Unlock()
if env.msgDisabled[env.addrs[i]] == nil {
env.msgDisabled[env.addrs[i]] = make(map[uint64]bool)
}
Expand All @@ -297,6 +309,8 @@ func (env *testEnv) makeScenarioOnlyOneDown(index int) *scenario {
return &scenario{
target: append([]int{}, index),
set: func(i int) string {
env.mu.Lock()
defer env.mu.Unlock()
for j := 0; j < len(env.addrs); j++ {
if j == i {
env.down[env.addrs[j]] = true
Expand All @@ -314,6 +328,8 @@ func (env *testEnv) makeScenarioRandomDown(index ...int) *scenario {
return &scenario{
target: append([]int{}, index...),
set: func(i int) string {
env.mu.Lock()
defer env.mu.Unlock()
if i == x {
env.down[env.addrs[i]] = true
return fmt.Sprintf("[%d:down]", i)
Expand Down Expand Up @@ -392,11 +408,18 @@ type simPeer struct {
}

func (sp *simPeer) SendWBFTConsensus(msgcode uint64, payload []byte) error {
if (sp.br.env.msgDisabled[sp.br.myAddr] != nil && sp.br.env.msgDisabled[sp.br.myAddr][msgcode]) ||
(sp.br.env.msgDisabled[sp.peerAddr] != nil && sp.br.env.msgDisabled[sp.peerAddr][msgcode]) ||
sp.br.env.down[sp.br.myAddr] || sp.br.env.down[sp.peerAddr] {
env := sp.br.env
env.mu.RLock()
disabledMy := env.msgDisabled[sp.br.myAddr] != nil && env.msgDisabled[sp.br.myAddr][msgcode]
disabledPeer := env.msgDisabled[sp.peerAddr] != nil && env.msgDisabled[sp.peerAddr][msgcode]
downMy := env.down[sp.br.myAddr]
downPeer := env.down[sp.peerAddr]
env.mu.RUnlock()

if disabledMy || disabledPeer || downMy || downPeer {
return nil
}

if err := sp.peerEngine.istanbulEventMux.Post(wbft.MessageEvent{
Code: msgcode,
Payload: payload,
Expand Down
55 changes: 41 additions & 14 deletions consensus/wbft/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ type Core struct {
priorState priorState

current *roundState
currentMutex sync.Mutex
currentMutex sync.RWMutex
handlerWg *sync.WaitGroup

roundChangeSet *roundChangeSet
roundChangeTimer *time.Timer
lastSentTimeoutCanceled *bool
timerMu sync.Mutex // protects roundChangeTimer, lastSentTimeoutCanceled, and timer stop/start

retrySendingRoundChangeTimer *time.Timer

Expand Down Expand Up @@ -151,6 +152,8 @@ func (c *Core) GetProposer() common.Address {
}

func (c *Core) IsCurrentProposal(blockHash common.Hash) bool {
c.currentMutex.RLock()
defer c.currentMutex.RUnlock()
return c.current != nil && c.current.pendingRequest != nil && c.current.pendingRequest.Proposal.Hash() == blockHash
}

Expand Down Expand Up @@ -311,20 +314,28 @@ func (c *Core) stopFuturePreprepareTimer() {
c.futurePreprepareTimer.Stop()
}
}

func (c *Core) stopTimer() {
c.stopFuturePreprepareTimer()

// Stop retry sending ROUND-CHANGE retry timer
c.stopRetrySendingRoundChangeTimer()

func (c *Core) stopRoundChangeTimer() {
if c.roundChangeTimer != nil {
c.roundChangeTimer.Stop()
}
}
func (c *Core) cancelLastSentTimeout() {
if c.lastSentTimeoutCanceled != nil {
*c.lastSentTimeoutCanceled = true
}
}
func (c *Core) stopTimer() {
c.timerMu.Lock()
defer c.timerMu.Unlock()
c.stopFuturePreprepareTimer()

// Stop retry sending ROUND-CHANGE retry timer
c.stopRetrySendingRoundChangeTimer()

c.stopRoundChangeTimer()

c.cancelLastSentTimeout()
}

func (c *Core) newRoundChangeTimer() {
c.stopTimer()
Expand Down Expand Up @@ -375,11 +386,13 @@ func (c *Core) newRoundChangeTimer() {
}

c.currentLogger(true, nil).Trace("WBFT: start new ROUND-CHANGE timer", "timeout", timeout.Seconds())
c.timerMu.Lock()
c.lastSentTimeoutCanceled = new(bool)
*c.lastSentTimeoutCanceled = false
c.roundChangeTimer = time.AfterFunc(timeout, func() {
c.sendEvent(timeoutEvent{c.lastSentTimeoutCanceled})
})
c.timerMu.Unlock()
}

// stopRetrySendingRoundChangeTimer stops the round-change retry timer if running.
Expand All @@ -389,18 +402,32 @@ func (c *Core) stopRetrySendingRoundChangeTimer() {
}
}

// newRetrySendingRoundChangeTimer sets a retry timer to reattempt round-change after a timeout
// newRetrySendingRoundChangeTimer sets a retry timer to reattempt round-change after a timeout.
// It copies round (and seq for config) under currentMutex; the callback uses only that copied round and never reads c.current.
func (c *Core) newRetrySendingRoundChangeTimer() {
c.stopRetrySendingRoundChangeTimer()
// Snapshot current view under lock to avoid races with startNewRound/updateRoundState.
var seq, round *big.Int
c.currentMutex.RLock()
if c.current != nil {
seq = new(big.Int).Set(c.current.Sequence())
round = new(big.Int).Set(c.current.Round())
}
c.currentMutex.RUnlock()
if seq == nil || round == nil {
return
}

// set timeout based on the round number
cfg := c.config.GetConfig(c.current.Sequence())
cfg := c.config.GetConfig(seq)
timeout := time.Duration(cfg.RequestTimeout) * time.Millisecond

c.currentLogger(true, nil).Trace("WBFT: set ROUND-CHANGE retry timer", "round", c.current.Round(), "timeout", timeout.Seconds())
c.logger.Trace("WBFT: set ROUND-CHANGE retry timer", "round", round.Uint64(), "timeout", timeout.Seconds())

c.timerMu.Lock()
c.stopRetrySendingRoundChangeTimer()
c.retrySendingRoundChangeTimer = time.AfterFunc(timeout, func() {
c.sendEvent(retryTimeoutEvent{c.current.Round()})
c.sendEvent(retryTimeoutEvent{round})
})
c.timerMu.Unlock()
}

func (c *Core) checkValidatorSignature(data []byte, sig []byte, view wbft.View) (common.Address, error) {
Expand Down
1 change: 1 addition & 0 deletions consensus/wbft/core/extraseal.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (c *Core) ProcessExtraSeal(lastProposal wbft.Proposal, priorRound *big.Int,

preparedSeal := make([]wbft.SealData, 0)
committedSeal := make([]wbft.SealData, 0)

// latestView is view to process extra seal
latestView := wbft.View{
Round: priorRound,
Expand Down
2 changes: 2 additions & 0 deletions consensus/wbft/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (c *Core) handlePreprepareMsg(preprepare *wbfmessage.Preprepare) error {
logger.Info("WBFT: PRE-PREPARE block proposal is in the future (will be treated again later)", "duration", duration)

// start a timer to re-input PRE-PREPARE message as a backlog event
c.timerMu.Lock()
c.stopFuturePreprepareTimer()
c.futurePreprepareTimer = time.AfterFunc(duration, func() {
_, validator := c.valSet.GetByAddress(preprepare.Source())
Expand All @@ -152,6 +153,7 @@ func (c *Core) handlePreprepareMsg(preprepare *wbfmessage.Preprepare) error {
msg: preprepare,
})
})
c.timerMu.Unlock()
} else {
logger.Warn("WBFT: invalid PRE-PREPARE block proposal", "err", err)
}
Expand Down
1 change: 1 addition & 0 deletions consensus/wbft/core/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (c *Core) checkRequestMsg(request *Request) error {
if request == nil || request.Proposal == nil {
return errInvalidMessage
}

if c.current == nil {
return errCurrentIsNil
}
Expand Down
4 changes: 3 additions & 1 deletion consensus/wbft/core/roundstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (s *roundState) SetRound(r *big.Int) {
s.round = new(big.Int).Set(r)
}

// Returned *big.Int must be treated read-only; never mutate it
func (s *roundState) Round() *big.Int {
s.mu.RLock()
defer s.mu.RUnlock()
Expand All @@ -121,9 +122,10 @@ func (s *roundState) SetSequence(seq *big.Int) {
s.mu.Lock()
defer s.mu.Unlock()

s.sequence = seq
s.sequence = new(big.Int).Set(seq)
}

// Returned *big.Int must be treated read-only; never mutate it
func (s *roundState) Sequence() *big.Int {
s.mu.RLock()
defer s.mu.RUnlock()
Expand Down