Skip to content

Commit

Permalink
Merge pull request #38 from erigontech/mtr_ws
Browse files Browse the repository at this point in the history
Fix stuck at cancelled list
  • Loading branch information
mh0lt authored Dec 5, 2024
2 parents 688e150 + fd58b86 commit de18ee1
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 38 deletions.
23 changes: 14 additions & 9 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ func (cn *Peer) request(r RequestIndex, maxRequests int, lock bool, lockTorrent
// this is required in case this is a re-request of a previously
// cancelled request - we need to clear the cancelled flag
cn.requestState.Cancelled.Remove(r)

if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[RequestIndex]int)
}
Expand Down Expand Up @@ -606,12 +607,14 @@ func (me *Peer) cancel(r RequestIndex, lock bool, lockTorrent bool) {
if !me.deleteRequest(r, false, false) {
panic(fmt.Sprintf("request %d not existing: should have been guarded", r))
}

if me._cancel(r, false, false) {
// Record that we expect to get a cancel ack.
if !me.requestState.Cancelled.CheckedAdd(r) {
panic(fmt.Sprintf("request %d: already cancelled for hash: %s", r, me.t.InfoHash()))
}
}
}

me.decPeakRequests(false)
}()
}
Expand Down Expand Up @@ -721,20 +724,22 @@ func runSafeExtraneous(f func()) {
}

// Returns true if it was valid to reject the request.
func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
func (c *Peer) remoteRejectedRequest(r RequestIndex, lock bool, lockTorrent bool) bool {
if !func() bool {
c.t.mu.Lock()
defer c.t.mu.Unlock()
if lockTorrent {
c.t.mu.Lock()
defer c.t.mu.Unlock()
}

c.mu.Lock()
defer c.mu.Unlock()
if lock {
c.mu.Lock()
defer c.mu.Unlock()
}

if c.deleteRequest(r, false, false) {
c.decPeakRequests(false)
} else {
removed := c.requestState.Cancelled.CheckedRemove(r)

if !removed {
if !c.requestState.Cancelled.CheckedRemove(r) {
return false
}
}
Expand Down
2 changes: 1 addition & 1 deletion peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
err = c.peerSentHaveNone(true)
case pp.Reject:
req := newRequestFromMessage(&msg)
if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req, false)) {
if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req, false), true, true) {
err = fmt.Errorf("received invalid reject for request %v", req)
c.logger.Levelf(log.Debug, "%v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion request-strategy/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ type PeerRequests interface {
IterateSnapshot(func(RequestIndex) bool)

Bitmap() *typedRoaring.Bitmap[RequestIndex]
}
}
2 changes: 1 addition & 1 deletion torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3382,7 +3382,7 @@ func (t *Torrent) addWebSeed(url string, lock bool, opts ...AddWebSeedsOpt) {
Url: url,
},
maxRequesters: maxRequests,
activeRequests: make(map[Request]webseed.Request, maxRequests),
activeRequests: make(map[Request]*webseed.Request, maxRequests),
// Limit requests rather than responses - becuase otherwise
// the go http layer buffers causing memory growth
requestRateLimiter: t.cl.config.DownloadRateLimiter,
Expand Down
43 changes: 25 additions & 18 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type webseedPeer struct {
// First field for stats alignment.
peer Peer
client webseed.Client
activeRequests map[Request]webseed.Request
activeRequests map[Request]*webseed.Request
maxActiveRequests int // the max number of active requests for this peer
processedRequests int // the total number of requests this peer has processed
maxRequesters int // the number of requester to run for this peer
Expand Down Expand Up @@ -81,7 +81,7 @@ func (ws *webseedPeer) writeInterested(interested bool, lock bool) bool {
}

func (ws *webseedPeer) _cancel(r RequestIndex, lock bool, lockTorrent bool) bool {
if active, ok := func() (active webseed.Request, ok bool) {
if active, ok := func() (active *webseed.Request, ok bool) {
req := ws.peer.t.requestIndexToRequest(r, lockTorrent)

if lock {
Expand All @@ -90,11 +90,15 @@ func (ws *webseedPeer) _cancel(r RequestIndex, lock bool, lockTorrent bool) bool
}

active, ok = ws.activeRequests[req]
return
return
}(); ok {
active.Cancel()
if lock {
ws.peer.mu.RLock()
defer ws.peer.mu.RUnlock()
}

// The requester is running and will handle the result.
return true
return active.Cancel()
}
// There should be no requester handling this, so no further events will occur.
return false
Expand Down Expand Up @@ -156,7 +160,8 @@ func (cn *webseedPeer) nominalMaxRequests(lock bool, lockTorrent bool) maxReques
var limitedBuffPool = storage.NewLimitedBufferPool(bufPool, 5_000_000_000)

func (ws *webseedPeer) doRequest(r Request) error {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r), limitedBuffPool, ws.requestRateLimiter, &ws.receiving)
webseedRequest := ws.client.NewRequest(ws.intoSpec(r), limitedBuffPool, ws.requestRateLimiter, &ws.receiving,
func() { ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r, false), false, false) } )

ws.peer.mu.Lock()
ws.activeRequests[r] = webseedRequest
Expand All @@ -167,17 +172,15 @@ func (ws *webseedPeer) doRequest(r Request) error {
}
ws.peer.mu.Unlock()

err := func() error {
ws.requesterCond.L.Unlock()
defer ws.requesterCond.L.Lock()
return ws.requestResultHandler(r, webseedRequest)
ws.requesterCond.L.Unlock()
defer func() {
ws.requesterCond.L.Lock()
ws.peer.mu.Lock()
delete(ws.activeRequests, r)
ws.peer.mu.Unlock()
}()

ws.peer.mu.Lock()
delete(ws.activeRequests, r)
ws.peer.mu.Unlock()

return err
return ws.requestResultHandler(r, webseedRequest)
}

func (ws *webseedPeer) requester(i int) {
Expand Down Expand Up @@ -530,7 +533,7 @@ func (ws *webseedPeer) onClose(lockTorrent bool) {
ws.requesterCond.Broadcast()
}

func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {
func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest *webseed.Request) error {
result := <-webseedRequest.Result
close(webseedRequest.Result) // one-shot

Expand All @@ -540,6 +543,10 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
}
}()

webseedRequest.Lock()
webseedRequest.Result = nil
webseedRequest.Unlock()

ws.persisting.Add(1)
defer ws.persisting.Add(-1)

Expand All @@ -561,8 +568,8 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
if len(piece) != 0 || result.Err == nil {
// Increment ChunksRead and friends
ws.peer.doChunkReadStats(int64(len(piece)))
ws.peer.readBytes(int64(len(piece)))
}
ws.peer.readBytes(int64(len(piece)))

if ws.peer.t.closed.IsSet() {
return nil
Expand Down Expand Up @@ -592,7 +599,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
}
}

if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r, true)) {
if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r, true), true, true) {
err = fmt.Errorf(`received invalid reject "%w", for request %v`, err, r)
ws.peer.logger.Levelf(log.Debug, "%v", err)
}
Expand Down
29 changes: 21 additions & 8 deletions webseed/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"net/http"
"strings"
"sync"
"sync/atomic"

"github.com/RoaringBitmap/roaring"
Expand All @@ -30,13 +31,24 @@ type requestPart struct {
}

type Request struct {
cancel func()
Result chan RequestResult
readers []io.Reader
sync.Mutex
cancel func()
onCancelled func()
Result chan RequestResult
}

func (r Request) Cancel() {
func (r *Request) Cancel() bool {
r.cancel()

r.Lock()
hasResult := r.Result != nil
r.Unlock()

if !hasResult {
r.onCancelled()
}

return hasResult
}

type Client struct {
Expand Down Expand Up @@ -72,7 +84,7 @@ type RequestResult struct {
Err error
}

func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter *rate.Limiter, receivingCounter *atomic.Int64) Request {
func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter *rate.Limiter, receivingCounter *atomic.Int64, onCancelled func()) *Request {
ctx, cancel := context.WithCancel(context.Background())
var requestParts []requestPart
if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
Expand Down Expand Up @@ -115,9 +127,10 @@ func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter
}) {
panic("request out of file bounds")
}
req := Request{
cancel: cancel,
Result: make(chan RequestResult, 1),
req := &Request{
cancel: cancel,
onCancelled: onCancelled,
Result: make(chan RequestResult, 1),
}
go func() {
readers, err := readRequestPartResponses(ctx, requestParts, receivingCounter)
Expand Down

0 comments on commit de18ee1

Please sign in to comment.