Skip to content

Commit

Permalink
Merge pull request #6 from erigontech/increase_webseed_parallelization
Browse files Browse the repository at this point in the history
update signaling to increase the potential active handlers
  • Loading branch information
mh0lt authored May 6, 2024
2 parents c5a9913 + c4c1a77 commit 089918d
Showing 1 changed file with 105 additions and 22 deletions.
127 changes: 105 additions & 22 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ const (

type webseedPeer struct {
// First field for stats alignment.
peer Peer
client webseed.Client
activeRequests map[Request]webseed.Request
requesterCond sync.Cond
updateRequestor *time.Timer
lastUnhandledErr time.Time
peer Peer
client webseed.Client
activeRequests map[Request]webseed.Request
maxActiveRequests int
processedRequests int
requesterCond sync.Cond
updateRequestor *time.Timer
lastUnhandledErr time.Time
}

var _ peerImpl = (*webseedPeer)(nil)
Expand Down Expand Up @@ -69,7 +71,7 @@ func (ws *webseedPeer) _cancel(r RequestIndex) bool {
}

func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
return webseed.RequestSpec{Start: ws.peer.t.requestOffset(r), Length: int64(r.Length)}
}

func (ws *webseedPeer) _request(r Request) bool {
Expand All @@ -79,6 +81,9 @@ func (ws *webseedPeer) _request(r Request) bool {
func (ws *webseedPeer) doRequest(r Request) error {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
ws.activeRequests[r] = webseedRequest
if activeLen := len(ws.activeRequests); activeLen > ws.maxActiveRequests {
ws.maxActiveRequests = activeLen
}
err := func() error {
ws.requesterCond.L.Unlock()
defer ws.requesterCond.L.Lock()
Expand All @@ -89,6 +94,9 @@ func (ws *webseedPeer) doRequest(r Request) error {
}

func (ws *webseedPeer) requester(i int) {

ws.peer.Torrent()._pendingPieces.GetCardinality()

ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()

Expand All @@ -102,12 +110,45 @@ func (ws *webseedPeer) requester(i int) {
return true
}

// if there are more than one requests in the queue and we don't
// have all of the responders activated yet we need to
// kick the other requestors into life - otherwise the max parallel
// requests will stay below the max - unless some external action happens
if pendingRequests := int(ws.peer.requestState.Requests.GetCardinality()); pendingRequests > 1 {
pendingRequests--
activeCount := len(ws.activeRequests) + 1

if activeCount < pendingRequests {
signals := pendingRequests - activeCount

if signals > 15 {
// max responders excluding this
signals = 15
}

for s := 0; s < signals; s++ {
ws.requesterCond.Signal()
}
}
}

// note doRequest unlocks ws.requesterCond.L which free the
// condition to allow other requestors to receive in parallel it
// will lock again before it returns so the remainder of the code
// here can assume it has a lock
err := ws.doRequest(r)
ws.requesterCond.L.Unlock()
if err != nil && !errors.Is(err, context.Canceled) {
ws.peer.logger.Printf("requester %v: error doing webseed request %v: %v", i, r, err)

if err == nil {
ws.processedRequests++
restart = ws.peer.requestState.Requests.GetCardinality() > 0
return false
}
restart = true

if !errors.Is(err, context.Canceled) {
ws.peer.logger.Levelf(log.Debug, "requester %v: error doing webseed request %v: %v", i, r, err)
}

ws.requesterCond.L.Unlock()
if errors.Is(err, webseed.ErrTooFast) {
time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
}
Expand All @@ -117,13 +158,20 @@ func (ws *webseedPeer) requester(i int) {
ws.peer.t.cl.locker().RUnlock()
time.Sleep(duration)
ws.requesterCond.L.Lock()
restart = ws.peer.requestState.Requests.GetCardinality() > 0
return false
})

if !(ws.peer.t.dataDownloadDisallowed.Bool() || ws.peer.t.info == nil) {
ws.peer.logger.Levelf(log.Debug, "%d: requests %d (a=%d,d=%d) active(c=%d,m=%d) complete(%d/%d) restart(%v)",
i, ws.processedRequests, ws.peer.requestState.Requests.GetCardinality(), len(ws.peer.getDesiredRequestState().Requests.requestIndexes),
len(ws.activeRequests), ws.maxActiveRequests, ws.peer.t._completedPieces.GetCardinality(), ws.peer.t.NumPieces(), restart)
}

if !restart {
if !ws.peer.t.dataDownloadDisallowed.Bool() && ws.peer.isLowOnRequests() && len(ws.peer.getDesiredRequestState().Requests.requestIndexes) > 0 {
if !(ws.peer.t.dataDownloadDisallowed.Bool() || ws.peer.t.Complete.Bool()) {
if ws.updateRequestor == nil {
ws.updateRequestor = time.AfterFunc(updateRequestsTimerDuration, func() { requestUpdate(ws) })
ws.updateRequestor = time.AfterFunc(webpeerUnchokeTimerDuration, func() { requestUpdate(ws) })
}
}

Expand All @@ -137,20 +185,54 @@ func (ws *webseedPeer) requester(i int) {
}
}

var webpeerUnchokeTimerDuration = 15 * time.Second

func requestUpdate(ws *webseedPeer) {
if ws != nil {
if !ws.peer.closed.IsSet() {
if len(ws.peer.getDesiredRequestState().Requests.requestIndexes) > 0 {
if ws.peer.isLowOnRequests() {
if time.Since(ws.peer.lastRequestUpdate) > updateRequestsTimerDuration {
ws.peer.updateRequests(peerUpdateRequestsTimerReason)
return
if ws != nil && !ws.peer.closed.IsSet() {
numPieces := uint64(ws.peer.t.NumPieces())
numCompleted := ws.peer.t._completedPieces.GetCardinality()

if numCompleted < numPieces {
if ws.peer.isLowOnRequests() && time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration {
// if the number of incomplete pieces is less than five adjust this peers
// lastUsefulChunkReceived to ensure that it can steal from non web peers
// this is to help ensure completion - we may want to add a head call
// before doing this to ensure the peer has the file
if numPieces-numCompleted < 16 {
lastExistingUseful := ws.peer.lastUsefulChunkReceived

for piece := pieceIndex(0); piece < pieceIndex(numPieces); piece++ {
if ws.peer.t._completedPieces.Contains(uint32(piece)) {
continue
}

if existing := ws.peer.t.requestingPeer(RequestIndex(piece)); existing != nil {
if existing.connectionFlags() == "WS" {
continue
}

// if the existing client looks like its not producing timely chunks then
// adjust our lastUsefulChunkReceived value to make sure we can steal the
// piece from it
if time.Since(existing.lastUsefulChunkReceived) > webpeerUnchokeTimerDuration {
if !lastExistingUseful.After(existing.lastUsefulChunkReceived) {
lastExistingUseful = existing.lastUsefulChunkReceived.Add(time.Minute)
}
}
}
}

ws.peer.lastUsefulChunkReceived = lastExistingUseful
}

ws.requesterCond.Signal()
ws.peer.logger.Levelf(log.Debug, "unchoke %d/%d (%s)", ws.processedRequests, ws.peer.t.NumPieces(), ws.peer.lastUsefulChunkReceived)

ws.peer.updateRequests("unchoked")
return
}
}

ws.requesterCond.Signal()
}
}

Expand Down Expand Up @@ -219,7 +301,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
// cfg.Dump(result.Err)

if webseedPeerCloseOnUnhandledError {
log.Printf("closing %v", ws)
log.Levelf(log.Debug, "closing %v", ws)
ws.peer.close()
} else {
ws.lastUnhandledErr = time.Now()
Expand All @@ -230,6 +312,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
}
return err
}

err = ws.peer.receiveChunk(&pp.Message{
Type: pp.Piece,
Index: r.Index,
Expand Down

0 comments on commit 089918d

Please sign in to comment.