Skip to content

Commit

Permalink
Merge pull request #8 from erigontech/increase_webseed_parallelization
Browse files Browse the repository at this point in the history
fixed download halts due to timing issues
  • Loading branch information
mh0lt authored May 22, 2024
2 parents 59efe2a + 84f7dea commit ea7c91a
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 67 deletions.
24 changes: 19 additions & 5 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/anacrolix/sync"
"github.com/pion/datachannel"
"golang.org/x/exp/maps"
"golang.org/x/time/rate"

"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/common"
Expand Down Expand Up @@ -2587,17 +2588,30 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
// downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
// "https://webtorrent.io/torrents/".
const maxRequests = 16

// This should affect how often we have to recompute requests for this peer. Note that
// because we can request more than 1 thing at a time over HTTP, we will hit the low
// requests mark more often, so recomputation is probably sooner than with regular peer
// conns. ~4x maxRequests would be about right.
peerMaxRequests := 128

// unless there is more availible bandwith - in which case max requests becomes
// the max available pieces that will consune that bandwidth

if bandwidth := t.cl.config.DownloadRateLimiter.Limit(); bandwidth > 0 && t.info != nil {
if maxPieceRequests := int(bandwidth / rate.Limit(t.info.PieceLength)); maxPieceRequests > peerMaxRequests {
peerMaxRequests = maxPieceRequests
}
}

ws := webseedPeer{
peer: Peer{
t: t,
outgoing: true,
Network: "http",
reconciledHandshakeStats: true,
// This should affect how often we have to recompute requests for this peer. Note that
// because we can request more than 1 thing at a time over HTTP, we will hit the low
// requests mark more often, so recomputation is probably sooner than with regular peer
// conns. ~4x maxRequests would be about right.
PeerMaxRequests: 128,

PeerMaxRequests: peerMaxRequests,
// TODO: Set ban prefix?
RemoteAddr: remoteAddrFromUrl(url),
callbacks: t.callbacks(),
Expand Down
122 changes: 60 additions & 62 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type webseedPeer struct {
maxActiveRequests int // the max number of active requests for this peer
processedRequests int // the total number of requests this peer has processed
maxRequesters int // the number of requester to run for this peer
waiting int // the number of requesters currently waiting for a signal
requesterCond sync.Cond
updateRequestor *time.Timer
lastUnhandledErr time.Time
Expand Down Expand Up @@ -118,9 +119,6 @@ func (ws *webseedPeer) doRequest(r Request) error {
}

func (ws *webseedPeer) requester(i int) {

ws.peer.Torrent()._pendingPieces.GetCardinality()

ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()

Expand All @@ -134,28 +132,6 @@ func (ws *webseedPeer) requester(i int) {
return true
}

// if there are more than one requests in the queue and we don't
// have all of the responders activated yet we need to
// kick the other requestors into life - otherwise the max parallel
// requests will stay below the max - unless some external action happens
if pendingRequests := int(ws.peer.requestState.Requests.GetCardinality()); pendingRequests > 1 {
pendingRequests--
activeCount := len(ws.activeRequests) + 1

if activeCount < pendingRequests {
signals := pendingRequests - activeCount

if signals > ws.maxRequesters {
// max responders excluding this
signals = ws.maxRequesters
}

for s := 0; s < signals; s++ {
ws.requesterCond.Signal()
}
}
}

// note doRequest unlocks ws.requesterCond.L which free the
// condition to allow other requestors to receive in parallel it
// will lock again before it returns so the remainder of the code
Expand Down Expand Up @@ -190,9 +166,9 @@ func (ws *webseedPeer) requester(i int) {
desiredRequests := len(ws.peer.getDesiredRequestState().Requests.requestIndexes)
pendingRequests := int(ws.peer.requestState.Requests.GetCardinality())

ws.peer.logger.Levelf(log.Debug, "%d: requests %d (a=%d,d=%d) active(c=%d,m=%d) complete(%d/%d) restart(%v)",
i, ws.processedRequests, pendingRequests, desiredRequests,
len(ws.activeRequests), ws.maxActiveRequests, ws.peer.t.numPiecesCompleted(), ws.peer.t.NumPieces(), restart)
ws.peer.logger.Levelf(log.Debug, "%d: requests %d (p=%d,d=%d,n=%d) active(c=%d,m=%d,w=%d) complete(%d/%d) restart(%v)",
i, ws.processedRequests, pendingRequests, desiredRequests, ws.nominalMaxRequests(),
len(ws.activeRequests), ws.maxActiveRequests, ws.waiting, ws.peer.t.numPiecesCompleted(), ws.peer.t.NumPieces(), restart)

if pendingRequests > ws.maxRequesters {
if pendingRequests > ws.peer.PeerMaxRequests {
Expand All @@ -209,6 +185,7 @@ func (ws *webseedPeer) requester(i int) {
}

ws.maxRequesters = pendingRequests
ws.requesterCond.Broadcast()
}

}
Expand All @@ -220,64 +197,85 @@ func (ws *webseedPeer) requester(i int) {
}
}

ws.waiting++
ws.requesterCond.Wait()
ws.waiting--

if ws.updateRequestor != nil {
ws.updateRequestor.Stop()
ws.updateRequestor = nil
}
} else {
// if there are more than one requests in the queue and we don't
// have all of the responders activated yet we need to
// kick the other requestors into life - otherwise the max parallel
// requests will stay below the max - unless some external action happens
if pendingRequests := int(ws.peer.requestState.Requests.GetCardinality()); pendingRequests > 1 {
activeCount := len(ws.activeRequests)

if activeCount < pendingRequests {
ws.requesterCond.Broadcast()
}
}
}
}
}

var webpeerUnchokeTimerDuration = 15 * time.Second

func requestUpdate(ws *webseedPeer) {
if ws != nil && !ws.peer.closed.IsSet() {
numPieces := uint64(ws.peer.t.NumPieces())
numCompleted := ws.peer.t._completedPieces.GetCardinality()

if numCompleted < numPieces {
if ws.peer.isLowOnRequests() && time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration {
// if the number of incomplete pieces is less than five adjust this peers
// lastUsefulChunkReceived to ensure that it can steal from non web peers
// this is to help ensure completion - we may want to add a head call
// before doing this to ensure the peer has the file
if numPieces-numCompleted < 16 {
lastExistingUseful := ws.peer.lastUsefulChunkReceived

for piece := pieceIndex(0); piece < pieceIndex(numPieces); piece++ {
if ws.peer.t._completedPieces.Contains(uint32(piece)) {
continue
}

if existing := ws.peer.t.requestingPeer(RequestIndex(piece)); existing != nil {
if existing.connectionFlags() == "WS" {
if ws != nil {
ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()

ws.updateRequestor = nil

if !ws.peer.closed.IsSet() {
numPieces := uint64(ws.peer.t.NumPieces())
numCompleted := ws.peer.t._completedPieces.GetCardinality()

if numCompleted < numPieces {
if ws.peer.isLowOnRequests() && time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration {
// if the number of incomplete pieces is less than five adjust this peers
// lastUsefulChunkReceived to ensure that it can steal from non web peers
// this is to help ensure completion - we may want to add a head call
// before doing this to ensure the peer has the file
if numPieces-numCompleted < 16 {
lastExistingUseful := ws.peer.lastUsefulChunkReceived

for piece := pieceIndex(0); piece < pieceIndex(numPieces); piece++ {
if ws.peer.t._completedPieces.Contains(uint32(piece)) {
continue
}

// if the existing client looks like its not producing timely chunks then
// adjust our lastUsefulChunkReceived value to make sure we can steal the
// piece from it
if time.Since(existing.lastUsefulChunkReceived) > webpeerUnchokeTimerDuration {
if !lastExistingUseful.After(existing.lastUsefulChunkReceived) {
lastExistingUseful = existing.lastUsefulChunkReceived.Add(time.Minute)
if existing := ws.peer.t.requestingPeer(RequestIndex(piece)); existing != nil {
if existing.connectionFlags() == "WS" {
continue
}

// if the existing client looks like its not producing timely chunks then
// adjust our lastUsefulChunkReceived value to make sure we can steal the
// piece from it
if time.Since(existing.lastUsefulChunkReceived) > webpeerUnchokeTimerDuration {
if !lastExistingUseful.After(existing.lastUsefulChunkReceived) {
lastExistingUseful = existing.lastUsefulChunkReceived.Add(time.Minute)
}
}
}
}
}

ws.peer.lastUsefulChunkReceived = lastExistingUseful
}
ws.peer.lastUsefulChunkReceived = lastExistingUseful
}

ws.peer.logger.Levelf(log.Debug, "unchoke %d/%d (%s)", ws.processedRequests, ws.peer.t.NumPieces(), ws.peer.lastUsefulChunkReceived)
ws.peer.logger.Levelf(log.Debug, "unchoke %d/%d maxRequesters=%d, waiting=%d, (%s)", ws.processedRequests, ws.peer.t.NumPieces(), ws.maxRequesters, ws.waiting, ws.peer.lastUsefulChunkReceived)

ws.peer.updateRequests("unchoked")
return
ws.peer.updateRequests("unchoked")
return
}
}
}

ws.requesterCond.Signal()
ws.requesterCond.Signal()
}
}
}

Expand Down

0 comments on commit ea7c91a

Please sign in to comment.