@@ -48,7 +48,7 @@ struct curlFileTransfer : public FileTransfer
4848 std::random_device rd;
4949 std::mt19937 mt19937;
5050
51- struct TransferItem : public std ::enable_shared_from_this<TransferItem>
51+ struct TransferItem : public std ::enable_shared_from_this<TransferItem>, public FileTransfer::Item
5252 {
5353 curlFileTransfer & fileTransfer;
5454 FileTransferRequest request;
@@ -60,6 +60,7 @@ struct curlFileTransfer : public FileTransfer
6060 // buffer to accompany the `req` above
6161 char errbuf[CURL_ERROR_SIZE];
6262 bool active = false ; // whether the handle has been added to the multi object
63+ bool paused = false ; // whether the request has been paused previously
6364 std::string statusMsg;
6465
6566 unsigned int attempt = 0 ;
@@ -116,7 +117,13 @@ struct curlFileTransfer : public FileTransfer
116117 successful response. */
117118 if (successfulStatuses.count (httpStatus)) {
118119 writtenToSink += data.size ();
119- this ->request .dataCallback (data);
120+ PauseTransfer needsPause = this ->request .dataCallback (data);
121+ if (needsPause == PauseTransfer::Yes) {
122+ /* Smuggle the boolean flag into writeCallback. Note that
123+ the finalSink might get called multiple times if there's
124+ decompression going on. */
125+ paused = true ;
126+ }
120127 }
121128 } else
122129 this ->result .data .append (data);
@@ -195,6 +202,14 @@ struct curlFileTransfer : public FileTransfer
195202 }
196203
197204 (*decompressionSink)({(char *) contents, realSize});
205+ if (paused) {
206+ /* The callback has signaled that the transfer needs to be
207+ paused. Already consumed data won't be returned twice unlike
208+ when returning CURL_WRITEFUNC_PAUSE.
209+ https://curl-library.cool.haxx.narkive.com/larE1cRA/curl-easy-pause-documentation-question
210+ */
211+ curl_easy_pause (req, CURLPAUSE_RECV);
212+ }
198213
199214 return realSize;
200215 } catch (...) {
@@ -364,6 +379,15 @@ struct curlFileTransfer : public FileTransfer
364379 return ((TransferItem *) clientp)->seekCallback (offset, origin);
365380 }
366381
382+ void unpause ()
383+ {
384+ /* Unpausing an already unpaused transfer is a no-op. */
385+ if (paused) {
386+ curl_easy_pause (req, CURLPAUSE_CONT);
387+ paused = false ;
388+ }
389+ }
390+
367391 void init ()
368392 {
369393 if (!req)
@@ -624,7 +648,7 @@ struct curlFileTransfer : public FileTransfer
624648 errorSink.reset ();
625649 embargo = std::chrono::steady_clock::now () + std::chrono::milliseconds (ms);
626650 try {
627- fileTransfer.enqueueItem (shared_from_this ());
651+ fileTransfer.enqueueItem (ref{ shared_from_this ()} );
628652 } catch (const nix::Error & e) {
629653 // If enqueue fails (e.g., during shutdown), fail the transfer properly
630654 // instead of letting the exception propagate, which would leave done=false
@@ -641,24 +665,24 @@ struct curlFileTransfer : public FileTransfer
641665 {
642666 struct EmbargoComparator
643667 {
644- bool operator ()(const std::shared_ptr <TransferItem> & i1, const std::shared_ptr <TransferItem> & i2)
668+ bool operator ()(const ref <TransferItem> & i1, const ref <TransferItem> & i2)
645669 {
646670 return i1->embargo > i2->embargo ;
647671 }
648672 };
649673
650- std::
651- priority_queue<std::shared_ptr<TransferItem>, std::vector<std::shared_ptr<TransferItem>>, EmbargoComparator>
652- incoming;
674+ std::priority_queue<ref<TransferItem>, std::vector<ref<TransferItem>>, EmbargoComparator> incoming;
675+ std::vector<ref<TransferItem>> unpause;
653676 private:
654677 bool quitting = false ;
655678 public:
656679 void quit ()
657680 {
658681 quitting = true ;
659- /* We wil not be processing any more incoming requests */
682+ /* We will not be processing any more incoming requests */
660683 while (!incoming.empty ())
661684 incoming.pop ();
685+ unpause.clear ();
662686 }
663687
664688 bool isQuitting ()
@@ -827,6 +851,17 @@ struct curlFileTransfer : public FileTransfer
827851 item->active = true ;
828852 items[item->req ] = item;
829853 }
854+
855+ /* NOTE: Unpausing may invoke callbacks to flush all buffers. */
856+ auto unpause = [&]() {
857+ auto state (state_.lock ());
858+ auto res = state->unpause ;
859+ state->unpause .clear ();
860+ return res;
861+ }();
862+
863+ for (auto & item : unpause)
864+ item->unpause ();
830865 }
831866
832867 debug (" download thread shutting down" );
@@ -851,7 +886,7 @@ struct curlFileTransfer : public FileTransfer
851886 }
852887 }
853888
854- void enqueueItem (std::shared_ptr <TransferItem> item)
889+ ItemHandle enqueueItem (ref <TransferItem> item)
855890 {
856891 if (item->request .data && item->request .uri .scheme () != " http" && item->request .uri .scheme () != " https"
857892 && item->request .uri .scheme () != " s3" )
@@ -866,19 +901,34 @@ struct curlFileTransfer : public FileTransfer
866901#ifndef _WIN32 // TODO need graceful async exit support on Windows?
867902 writeFull (wakeupPipe.writeSide .get (), " " );
868903#endif
904+
905+ return ItemHandle (static_cast <Item &>(*item));
869906 }
870907
871- void enqueueFileTransfer (const FileTransferRequest & request, Callback<FileTransferResult> callback) override
908+ ItemHandle enqueueFileTransfer (const FileTransferRequest & request, Callback<FileTransferResult> callback) override
872909 {
873910 /* Handle s3:// URIs by converting to HTTPS and optionally adding auth */
874911 if (request.uri .scheme () == " s3" ) {
875912 auto modifiedRequest = request;
876913 modifiedRequest.setupForS3 ();
877- enqueueItem (std::make_shared<TransferItem>(*this , std::move (modifiedRequest), std::move (callback)));
878- return ;
914+ return enqueueItem (make_ref<TransferItem>(*this , std::move (modifiedRequest), std::move (callback)));
879915 }
880916
881- enqueueItem (std::make_shared<TransferItem>(*this , request, std::move (callback)));
917+ return enqueueItem (make_ref<TransferItem>(*this , request, std::move (callback)));
918+ }
919+
920+ void unpauseTransfer (ref<TransferItem> item)
921+ {
922+ auto state (state_.lock ());
923+ state->unpause .push_back (std::move (item));
924+ #ifndef _WIN32 // TODO need graceful async exit support on Windows?
925+ writeFull (wakeupPipe.writeSide .get (), " " );
926+ #endif
927+ }
928+
929+ void unpauseTransfer (ItemHandle handle) override
930+ {
931+ unpauseTransfer (ref{static_cast <TransferItem &>(handle.item .get ()).shared_from_this ()});
882932 }
883933};
884934
@@ -975,6 +1025,7 @@ void FileTransfer::download(
9751025 struct State
9761026 {
9771027 bool quit = false ;
1028+ bool paused = false ;
9781029 std::exception_ptr exc;
9791030 std::string data;
9801031 std::condition_variable avail, request;
@@ -990,31 +1041,38 @@ void FileTransfer::download(
9901041 state->request .notify_one ();
9911042 });
9921043
993- request.dataCallback = [_state](std::string_view data) {
1044+ request.dataCallback = [_state, uri = request. uri . to_string () ](std::string_view data) -> PauseTransfer {
9941045 auto state (_state->lock ());
9951046
9961047 if (state->quit )
997- return ;
998-
999- /* If the buffer is full, then go to sleep until the calling
1000- thread wakes us up (i.e. when it has removed data from the
1001- buffer). We don't wait forever to prevent stalling the
1002- download thread. (Hopefully sleeping will throttle the
1003- sender.) */
1004- if (state->data .size () > fileTransferSettings.downloadBufferSize ) {
1005- debug (" download buffer is full; going to sleep" );
1006- static bool haveWarned = false ;
1007- warnOnce (haveWarned, " download buffer is full; consider increasing the 'download-buffer-size' setting" );
1008- state.wait_for (state->request , std::chrono::seconds (10 ));
1009- }
1048+ return PauseTransfer::No;
10101049
10111050 /* Append data to the buffer and wake up the calling
10121051 thread. */
10131052 state->data .append (data);
10141053 state->avail .notify_one ();
1054+
1055+ if (state->data .size () <= fileTransferSettings.downloadBufferSize )
1056+ return PauseTransfer::No;
1057+
1058+ /* dataCallback gets called multiple times by an intermediate sink. Only
1059+ issue the debug message the first time around. */
1060+ if (!state->paused )
1061+ debug (
1062+ " pausing transfer for '%s': download buffer is full (%d > %d)" ,
1063+ uri,
1064+ state->data .size (),
1065+ fileTransferSettings.downloadBufferSize );
1066+
1067+ state->paused = true ;
1068+
1069+ /* Technically the buffer might become larger than
1070+ downloadBufferSize, but with sinks there's no way to avoid
1071+ consuming data. */
1072+ return PauseTransfer::Yes;
10151073 };
10161074
1017- enqueueFileTransfer (
1075+ auto handle = enqueueFileTransfer (
10181076 request, {[_state, resultCallback{std::move (resultCallback)}](std::future<FileTransferResult> fut) {
10191077 auto state (_state->lock ());
10201078 state->quit = true ;
@@ -1047,6 +1105,10 @@ void FileTransfer::download(
10471105 return ;
10481106 }
10491107
1108+ if (state->paused ) {
1109+ unpauseTransfer (handle);
1110+ state->paused = false ;
1111+ }
10501112 state.wait (state->avail );
10511113
10521114 if (state->data .empty ())
0 commit comments