Skip to content

Commit c46725e

Browse files
authored
Merge pull request #1758 from 0chain/feat/retry-refpath
Add retries for refpath request
2 parents 5c92a15 + bdbea0c commit c46725e

File tree

5 files changed

+98
-36
lines changed

5 files changed

+98
-36
lines changed

zboxcore/sdk/allocation.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ func SetDownloadWorkerCount(count int) {
422422

423423
// InitAllocation initializes the allocation.
424424
func (a *Allocation) InitAllocation() {
425-
a.downloadChan = make(chan *DownloadRequest, 100)
425+
a.downloadChan = make(chan *DownloadRequest, 400)
426426
a.repairChan = make(chan *RepairRequest, 1)
427427
a.ctx, a.ctxCancelF = context.WithCancel(context.Background())
428428
a.downloadProgressMap = make(map[string]*DownloadRequest)

zboxcore/sdk/commitworker.go

+79-31
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ func (commitReq *CommitRequestV2) processCommit() {
471471
return
472472
}
473473
if commitReq.commitMask.CountOnes() < commitReq.consensusThresh {
474+
commitReq.commitMask = zboxutil.NewUint128(0)
474475
commitReq.result = ErrorCommitResult("Failed to get reference path")
475476
return
476477
}
@@ -624,41 +625,78 @@ func getReferencePathV2(blobber *blockchain.StorageNode, allocationID, allocatio
624625
return trie, nil
625626
}
626627
now := time.Now()
627-
req, err := zboxutil.NewReferencePathRequestV2(blobber.Baseurl, allocationID, allocationTx, sig, paths, false)
628-
if err != nil {
629-
l.Logger.Error("Creating ref path req", err)
630-
return nil, err
631-
}
632-
var lR ReferencePathResultV2
633-
ctx, cncl := context.WithTimeout(context.Background(), (time.Second * 30))
634-
err = zboxutil.HttpDo(ctx, cncl, req, func(resp *http.Response, err error) error {
635-
if err != nil {
636-
l.Logger.Error("Ref path error:", err)
637-
return err
638-
}
639-
defer resp.Body.Close()
640-
respBody, err := io.ReadAll(resp.Body)
641-
if err != nil {
642-
l.Logger.Error("Ref path: Resp", err)
643-
return err
644-
}
645-
if resp.StatusCode != http.StatusOK {
646-
return errors.New(
647-
strconv.Itoa(resp.StatusCode),
648-
fmt.Sprintf("Reference path error response: Status: %d - %s ",
649-
resp.StatusCode, string(respBody)))
650-
}
651-
err = json.Unmarshal(respBody, &lR)
652-
if err != nil {
653-
l.Logger.Error("Reference path json decode error: ", err)
654-
return err
655-
}
656-
return nil
657-
})
628+
var (
629+
shouldContinue bool
630+
err error
631+
lR ReferencePathResultV2
632+
)
633+
for retries := 0; retries < 3; retries++ {
634+
err, shouldContinue = func() (err error, shouldContinue bool) {
635+
var req *http.Request
636+
req, err = zboxutil.NewReferencePathRequestV2(blobber.Baseurl, allocationID, allocationTx, sig, paths, false)
637+
if err != nil {
638+
l.Logger.Error("Creating ref path req", err)
639+
return
640+
}
658641

642+
ctx, cncl := context.WithTimeout(context.Background(), (time.Second * 30))
643+
err = zboxutil.HttpDo(ctx, cncl, req, func(resp *http.Response, err error) error {
644+
if err != nil {
645+
l.Logger.Error("Ref path error:", err)
646+
if errors.Is(err, http.ErrServerClosed) || strings.Contains(err.Error(), "GOAWAY") {
647+
shouldContinue = true
648+
}
649+
return err
650+
}
651+
defer resp.Body.Close()
652+
respBody, err := io.ReadAll(resp.Body)
653+
if err != nil {
654+
l.Logger.Error("Ref path: Resp", err)
655+
if strings.Contains(err.Error(), "GOAWAY") || errors.Is(err, io.ErrUnexpectedEOF) {
656+
shouldContinue = true
657+
}
658+
return err
659+
}
660+
if resp.StatusCode != http.StatusOK {
661+
if resp.StatusCode == http.StatusTooManyRequests {
662+
logger.Logger.Debug(blobber.Baseurl,
663+
" got too many request error. Retrying")
664+
665+
var r int
666+
r, err = zboxutil.GetRateLimitValue(resp)
667+
if err != nil {
668+
logger.Logger.Error(err)
669+
return err
670+
}
671+
672+
time.Sleep(time.Duration(r) * time.Second)
673+
shouldContinue = true
674+
return err
675+
}
676+
return errors.New(
677+
strconv.Itoa(resp.StatusCode),
678+
fmt.Sprintf("Reference path error response: Status: %d - %s ",
679+
resp.StatusCode, string(respBody)))
680+
}
681+
err = json.Unmarshal(respBody, &lR)
682+
if err != nil {
683+
l.Logger.Error("Reference path json decode error: ", err)
684+
return err
685+
}
686+
return nil
687+
})
688+
return
689+
}()
690+
if shouldContinue {
691+
continue
692+
} else {
693+
break
694+
}
695+
}
659696
if err != nil {
660697
return nil, err
661698
}
699+
662700
elapsedRefPath := time.Since(now)
663701
mu.Lock()
664702
defer mu.Unlock()
@@ -725,6 +763,9 @@ func submitWriteMarker(wmData, metaData []byte, blobber *blockchain.StorageNode,
725763
respBody, err = io.ReadAll(resp.Body)
726764
if err != nil {
727765
logger.Logger.Error("Response read: ", err)
766+
if errors.Is(err, io.ErrUnexpectedEOF) {
767+
shouldContinue = true
768+
}
728769
return
729770
}
730771
if resp.StatusCode == http.StatusOK {
@@ -764,6 +805,13 @@ func submitWriteMarker(wmData, metaData []byte, blobber *blockchain.StorageNode,
764805
return
765806
}
766807

808+
trimmed := strings.TrimSpace(string(respBody))
809+
if strings.HasPrefix(trimmed, "<html>") {
810+
time.Sleep(5 * time.Second)
811+
shouldContinue = true
812+
return
813+
}
814+
767815
err = thrown.New("commit_error",
768816
fmt.Sprintf("Got error response %s with status %d", respBody, resp.StatusCode))
769817
return

zboxcore/sdk/downloadworker.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,7 @@ func (req *DownloadRequest) processDownload() {
737737
if startBlock+int64(j)*numBlocks+numBlocks > endBlock {
738738
blocksToDownload = endBlock - (startBlock + int64(j)*numBlocks)
739739
}
740-
data, err := req.getBlocksData(startBlock+int64(j)*numBlocks, blocksToDownload, j == 0)
740+
data, err := req.getBlocksData(startBlock+int64(j)*numBlocks, blocksToDownload, j == 0 && shouldTimeRequest)
741741
if req.isDownloadCanceled {
742742
return errors.New("download_abort", "Download aborted by user")
743743
}
@@ -792,7 +792,7 @@ func (req *DownloadRequest) processDownload() {
792792
wg.Wait()
793793
// req.fileHandler.Sync() //nolint
794794
elapsedGetBlocksAndWrite := time.Since(now) - elapsedInitEC - elapsedInitEncryption
795-
l.Logger.Debug(fmt.Sprintf("[processDownload] Timings:\n allocation_id: %s,\n remotefilepath: %s,\n initEC: %d ms,\n initEncryption: %d ms,\n getBlocks and writes: %d ms",
795+
l.Logger.Info(fmt.Sprintf("[processDownload] Timings:\n allocation_id: %s,\n remotefilepath: %s,\n initEC: %d ms,\n initEncryption: %d ms,\n getBlocks and writes: %d ms",
796796
req.allocationID,
797797
req.remotefilepath,
798798
elapsedInitEC.Milliseconds(),

zboxcore/sdk/sdk.go

+5
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ var (
5454
networkWorkerTimerInHours = 1 //nolint:unused
5555
singleClientMode = false
5656
shouldVerifyHash = true
57+
shouldTimeRequest = true
5758
)
5859

5960
func SetSingleClietnMode(mode bool) {
@@ -64,6 +65,10 @@ func SetShouldVerifyHash(verify bool) {
6465
shouldVerifyHash = verify
6566
}
6667

68+
func SetShouldTimeRequest(timeRequest bool) {
69+
shouldTimeRequest = timeRequest
70+
}
71+
6772
func SetSaveProgress(save bool) {
6873
shouldSaveProgress = save
6974
}

zboxcore/zboxutil/download_buffer.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ func (r *DownloadBufferWithMask) RequestChunk(ctx context.Context, num int) []by
105105
default:
106106
}
107107
r.mu.Lock()
108+
if r.downloadBuf == nil {
109+
r.mu.Unlock()
110+
return nil
111+
}
108112
isSet := r.mask & (1 << num)
109113
// already assigned
110114
if isSet == 0 {
@@ -137,9 +141,14 @@ func (r *DownloadBufferWithMask) ReleaseChunk(num int) {
137141
}
138142

139143
func (r *DownloadBufferWithMask) ClearBuffer() {
140-
for _, buff := range r.downloadBuf {
144+
r.mu.Lock()
145+
defer r.mu.Unlock()
146+
for num, buff := range r.downloadBuf {
141147
if buff != nil {
142-
BufferPool.Put(buff)
148+
isSet := r.mask & (1 << num)
149+
if isSet != 0 {
150+
BufferPool.Put(buff)
151+
}
143152
}
144153
}
145154
r.downloadBuf = nil

0 commit comments

Comments
 (0)