Skip to content

Commit

Permalink
concurrent routingQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
Ji Qiren committed May 31, 2022
1 parent 201786e commit cea54f2
Showing 1 changed file with 51 additions and 27 deletions.
78 changes: 51 additions & 27 deletions p2p/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ func (pm *PeerManager) NeighborCount(direction connDirection) int {
pm.neighborMutex.RLock()
defer pm.neighborMutex.RUnlock()

//ilog.Infof("NeighborCount %v %v", direction, pm.neighborCount[direction])
return pm.neighborCount[direction]
}

Expand Down Expand Up @@ -517,45 +518,68 @@ func (pm *PeerManager) routingQuery(ids []string) {
}

pm.Broadcast(bytes, RoutingTableQuery, UrgentMessage)
outboundNeighborCount := pm.NeighborCount(outbound)
if outboundNeighborCount >= pm.neighborCap[outbound] {
return
}
allPeerIDs := pm.routingTable.ListPeers()
r := rand.New(rand.NewSource(time.Now().Unix()))
perm := r.Perm(len(allPeerIDs))

for i, t := 0, 0; i < len(perm) && t < pm.neighborCap[outbound]-outboundNeighborCount; i++ {
i := 0
for {
if pm.isStopped() {
ilog.Infof("peer manager stopped, stop routingQuery %v", ids)
return
}

peerID := allPeerIDs[perm[i]]
if peerID == pm.host.ID() {
continue
}
if pm.GetNeighbor(peerID) != nil {
continue
if pm.NeighborCount(outbound) >= pm.neighborCap[outbound] {
return
}
ilog.Debugf("dial peer: pid=%v", peerID.Pretty())
stream, err := pm.newStream(peerID)
if err != nil {
ilog.Warnf("create stream failed. pid=%s, err=%v", peerID.Pretty(), err)

if strings.Contains(err.Error(), "connected to wrong peer") {
pm.deletePeerInfo(peerID)
maxConcurrentQueryNum := pm.neighborCap[outbound] - pm.NeighborCount(outbound)
queryingPeerIDs := []PeerID{}
for ; i < len(perm); i++ {
peerID := allPeerIDs[perm[i]]
if peerID == pm.host.ID() {
continue
}

pm.recordDialFail(peerID)
if pm.isDead(peerID) {
pm.deletePeerInfo(peerID)
if pm.GetNeighbor(peerID) != nil {
continue
}
continue
queryingPeerIDs = append(queryingPeerIDs, peerID)
if len(queryingPeerIDs) >= maxConcurrentQueryNum {
break
}
}
if len(queryingPeerIDs) == 0 {
ilog.Warnf("cannot make routingQuery request")
break
}

var wg sync.WaitGroup
wg.Add(len(queryingPeerIDs))
ilog.Infof("make %v routingQuery request. Current outbound neighbor count: %v", len(queryingPeerIDs), pm.NeighborCount(outbound))
for _, peerID := range queryingPeerIDs {
peerID := peerID
go func(peerId PeerID) {
defer wg.Done()
ilog.Debugf("dial peer: pid=%v", peerID.Pretty())
stream, err := pm.newStream(peerID)
if err != nil {
ilog.Warnf("create stream failed. pid=%s, err=%v", peerID.Pretty(), err)

if strings.Contains(err.Error(), "connected to wrong peer") {
pm.deletePeerInfo(peerID)
return
}

pm.recordDialFail(peerID)
if pm.isDead(peerID) {
pm.deletePeerInfo(peerID)
}
return
}
pm.HandleStream(stream, outbound)
pm.SendToPeer(peerID, bytes, RoutingTableQuery, UrgentMessage)
}(peerID)
}
pm.HandleStream(stream, outbound)
pm.SendToPeer(peerID, bytes, RoutingTableQuery, UrgentMessage)
t++
wg.Wait()
ilog.Infof("finished %v routingQuery request. Current outbound neighbor count: %v", len(queryingPeerIDs), pm.NeighborCount(outbound))
}
}

Expand Down

0 comments on commit cea54f2

Please sign in to comment.