diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go index 7ad4728e1..7c5deedbe 100644 --- a/p2p/peer_manager.go +++ b/p2p/peer_manager.go @@ -420,6 +420,7 @@ func (pm *PeerManager) NeighborCount(direction connDirection) int { pm.neighborMutex.RLock() defer pm.neighborMutex.RUnlock() + //ilog.Infof("NeighborCount %v %v", direction, pm.neighborCount[direction]) return pm.neighborCount[direction] } @@ -517,45 +518,68 @@ func (pm *PeerManager) routingQuery(ids []string) { } pm.Broadcast(bytes, RoutingTableQuery, UrgentMessage) - outboundNeighborCount := pm.NeighborCount(outbound) - if outboundNeighborCount >= pm.neighborCap[outbound] { - return - } allPeerIDs := pm.routingTable.ListPeers() r := rand.New(rand.NewSource(time.Now().Unix())) perm := r.Perm(len(allPeerIDs)) - for i, t := 0, 0; i < len(perm) && t < pm.neighborCap[outbound]-outboundNeighborCount; i++ { + i := 0 + for { if pm.isStopped() { + ilog.Infof("peer manager stopped, stop routingQuery %v", ids) return } - - peerID := allPeerIDs[perm[i]] - if peerID == pm.host.ID() { - continue - } - if pm.GetNeighbor(peerID) != nil { - continue + if pm.NeighborCount(outbound) >= pm.neighborCap[outbound] { + return } - ilog.Debugf("dial peer: pid=%v", peerID.Pretty()) - stream, err := pm.newStream(peerID) - if err != nil { - ilog.Warnf("create stream failed. pid=%s, err=%v", peerID.Pretty(), err) - - if strings.Contains(err.Error(), "connected to wrong peer") { - pm.deletePeerInfo(peerID) + maxConcurrentQueryNum := pm.neighborCap[outbound] - pm.NeighborCount(outbound) + queryingPeerIDs := []PeerID{} + for ; i < len(perm); i++ { + peerID := allPeerIDs[perm[i]] + if peerID == pm.host.ID() { continue } - - pm.recordDialFail(peerID) - if pm.isDead(peerID) { - pm.deletePeerInfo(peerID) + if pm.GetNeighbor(peerID) != nil { + continue } - continue + queryingPeerIDs = append(queryingPeerIDs, peerID) + if len(queryingPeerIDs) >= maxConcurrentQueryNum { + break + } + } + if len(queryingPeerIDs) == 0 { + ilog.Warnf("cannot make routingQuery request") + break + } + + var wg sync.WaitGroup + wg.Add(len(queryingPeerIDs)) + ilog.Infof("make %v routingQuery request. Current outbound neighbor count: %v", len(queryingPeerIDs), pm.NeighborCount(outbound)) + for _, peerID := range queryingPeerIDs { + peerID := peerID + go func(peerId PeerID) { + defer wg.Done() + ilog.Debugf("dial peer: pid=%v", peerID.Pretty()) + stream, err := pm.newStream(peerID) + if err != nil { + ilog.Warnf("create stream failed. pid=%s, err=%v", peerID.Pretty(), err) + + if strings.Contains(err.Error(), "connected to wrong peer") { + pm.deletePeerInfo(peerID) + return + } + + pm.recordDialFail(peerID) + if pm.isDead(peerID) { + pm.deletePeerInfo(peerID) + } + return + } + pm.HandleStream(stream, outbound) + pm.SendToPeer(peerID, bytes, RoutingTableQuery, UrgentMessage) + }(peerID) } - pm.HandleStream(stream, outbound) - pm.SendToPeer(peerID, bytes, RoutingTableQuery, UrgentMessage) - t++ + wg.Wait() + ilog.Infof("finished %v routingQuery request. Current outbound neighbor count: %v", len(queryingPeerIDs), pm.NeighborCount(outbound)) } }