diff --git a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs index e4740f186..5f2059431 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs @@ -286,8 +286,11 @@ await _activityTracer.TraceActivityAsync(async activity => try { - // Keep track of active download tasks - var downloadTasks = new ConcurrentDictionary(); + // Keep track of active download tasks keyed by ChunkIndex. + // Keying by Task itself is unsafe here: the value stored is the continuation + // task, but the continuation's lambda parameter `t` is the antecedent, so + // TryRemove(t, ...) never matches and entries accumulate until method exit. + var downloadTasks = new ConcurrentDictionary(); var downloadTaskCompletionSource = new TaskCompletionSource(); // Process items from the download queue until it's completed @@ -320,7 +323,7 @@ await _activityTracer.TraceActivityAsync(async activity => { try { - await Task.WhenAll(downloadTasks.Keys).ConfigureAwait(false); + await Task.WhenAll(downloadTasks.Values).ConfigureAwait(false); } catch (Exception ex) { @@ -375,6 +378,7 @@ await _activityTracer.TraceActivityAsync(async activity => ]); // Start the download task + long chunkIndex = downloadResult.ChunkIndex; Task downloadTask = DownloadFileAsync(downloadResult, cancellationToken) .ContinueWith(t => { @@ -382,7 +386,7 @@ await _activityTracer.TraceActivityAsync(async activity => _downloadSemaphore.Release(); // Remove the task from the dictionary - downloadTasks.TryRemove(t, out _); + downloadTasks.TryRemove(chunkIndex, out _); // Handle any exceptions if (t.IsFaulted) @@ -413,7 +417,7 @@ await _activityTracer.TraceActivityAsync(async activity => }, cancellationToken); // Add the task to the dictionary - downloadTasks[downloadTask] = downloadResult; + downloadTasks[chunkIndex] = downloadTask; // Add the result to the result queue add the result here to assure the download sequence. _resultQueue.Add(downloadResult, cancellationToken);