Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,11 @@ await _activityTracer.TraceActivityAsync(async activity =>

try
{
// Keep track of active download tasks
var downloadTasks = new ConcurrentDictionary<Task, IDownloadResult>();
// Keep track of active download tasks keyed by ChunkIndex.
// Keying by Task itself is unsafe here: the value stored is the continuation
// task, but the continuation's lambda parameter `t` is the antecedent, so
// TryRemove(t, ...) never matches and entries accumulate until method exit.
var downloadTasks = new ConcurrentDictionary<long, Task>();
var downloadTaskCompletionSource = new TaskCompletionSource<bool>();

// Process items from the download queue until it's completed
Expand Down Expand Up @@ -320,7 +323,7 @@ await _activityTracer.TraceActivityAsync(async activity =>
{
try
{
await Task.WhenAll(downloadTasks.Keys).ConfigureAwait(false);
await Task.WhenAll(downloadTasks.Values).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -375,14 +378,15 @@ await _activityTracer.TraceActivityAsync(async activity =>
]);

// Start the download task
long chunkIndex = downloadResult.ChunkIndex;
Task downloadTask = DownloadFileAsync(downloadResult, cancellationToken)
.ContinueWith(t =>
{
// Release the download slot
_downloadSemaphore.Release();

// Remove the task from the dictionary
downloadTasks.TryRemove(t, out _);
downloadTasks.TryRemove(chunkIndex, out _);

// Handle any exceptions
if (t.IsFaulted)
Expand Down Expand Up @@ -413,7 +417,7 @@ await _activityTracer.TraceActivityAsync(async activity =>
}, cancellationToken);

// Add the task to the dictionary
downloadTasks[downloadTask] = downloadResult;
downloadTasks[chunkIndex] = downloadTask;

// Add the result to the result queue add the result here to assure the download sequence.
_resultQueue.Add(downloadResult, cancellationToken);
Expand Down
Loading