diff --git a/RuriLib.Parallelization.Tests/ParallelizerTests.cs b/RuriLib.Parallelization.Tests/ParallelizerTests.cs index 5265e16c2..200425894 100644 --- a/RuriLib.Parallelization.Tests/ParallelizerTests.cs +++ b/RuriLib.Parallelization.Tests/ParallelizerTests.cs @@ -6,271 +6,273 @@ using System.Threading.Tasks; using Xunit; -namespace RuriLib.Parallelization.Tests +namespace RuriLib.Parallelization.Tests; + +public class ParallelizerTests { - public class ParallelizerTests - { - private readonly Func> parityCheck - = new((number, token) => Task.FromResult(number % 2 == 0)); - - private readonly Func> longTask - = new(async (number, token) => { await Task.Delay(100); return true; }); + private readonly Func> _parityCheck + = (number, _) => Task.FromResult(number % 2 == 0); - private readonly ParallelizerType type = ParallelizerType.TaskBased; - private int progressCount; - private bool lastResult; - private bool completedFlag; - private Exception lastException; - - private void OnProgress(object sender, float value) => progressCount++; - private void OnResult(object sender, ResultDetails value) => lastResult = value.Result; - private void OnCompleted(object sender, EventArgs e) => completedFlag = true; - private void OnException(object sender, Exception ex) => lastException = ex; - - [Fact] - public async Task Run_QuickTasks_CompleteAndCall() - { - var count = 100; - var parallelizer = ParallelizerFactory.Create( - type: type, - workItems: Enumerable.Range(1, count), - workFunction: parityCheck, - degreeOfParallelism: 1, - totalAmount: count, - skip: 0); - - progressCount = 0; - completedFlag = false; - lastException = null; - parallelizer.ProgressChanged += OnProgress; - parallelizer.NewResult += OnResult; - parallelizer.Completed += OnCompleted; - parallelizer.Error += OnException; + private readonly Func> _longTask + = async (_, _) => { await Task.Delay(100); return true; }; + + private const ParallelizerType _type = ParallelizerType.TaskBased; + private int _progressCount; + private bool _lastResult; + private bool _completedFlag; + private Exception _lastException; + + private void OnProgress(object sender, float value) => _progressCount++; + + private void OnResult(object sender, ResultDetails value) => _lastResult = value.Result; + + private void OnCompleted(object sender, EventArgs e) => _completedFlag = true; + + private void OnException(object sender, Exception ex) => _lastException = ex; + + [Fact] + public async Task Run_QuickTasks_CompleteAndCall() + { + const int count = 100; + var parallelizer = ParallelizerFactory.Create( + type: _type, + workItems: Enumerable.Range(1, count), + workFunction: _parityCheck, + degreeOfParallelism: 1, + totalAmount: count, + skip: 0); + + _progressCount = 0; + _completedFlag = false; + _lastException = null; + parallelizer.ProgressChanged += OnProgress; + parallelizer.NewResult += OnResult; + parallelizer.Completed += OnCompleted; + parallelizer.Error += OnException; - await parallelizer.Start(); - - var cts = new CancellationTokenSource(); - cts.CancelAfter(10000); - await parallelizer.WaitCompletion(cts.Token); - - Assert.Equal(100, progressCount); - Assert.True(completedFlag); - Assert.Null(lastException); - Assert.True(lastResult); - } - - [Fact] - public async Task Run_QuickTasks_StopwatchStops() - { - var count = 100; - var parallelizer = ParallelizerFactory.Create( - type: type, - workItems: Enumerable.Range(1, count), - workFunction: parityCheck, - degreeOfParallelism: 1, - totalAmount: count, - skip: 0); - - await parallelizer.Start(); - - var cts = new CancellationTokenSource(); - cts.CancelAfter(10000); - await parallelizer.WaitCompletion(cts.Token); - - var elapsed = parallelizer.Elapsed; - await Task.Delay(1000); - Assert.Equal(elapsed, parallelizer.Elapsed); - } - - [Fact] - public async Task Run_LongTasks_StopBeforeCompletion() - { - // In theory this should take 1000 * 100 / 10 = 10.000 ms = 10 seconds - var parallelizer = ParallelizerFactory.Create( - type: type, - workItems: Enumerable.Range(1, 1000), - workFunction: longTask, - degreeOfParallelism: 10, - totalAmount: 1000, - skip: 0); - - progressCount = 0; - completedFlag = false; - lastException = null; - parallelizer.ProgressChanged += OnProgress; - parallelizer.Completed += OnCompleted; - parallelizer.Error += OnException; - - await parallelizer.Start(); - await Task.Delay(250); - - await parallelizer.Stop(); - - Assert.InRange(progressCount, 10, 50); - Assert.True(completedFlag); - Assert.Null(lastException); - } - - [Fact] - public async Task Run_LongTasks_AbortBeforeCompletion() - { - // In theory this should take 1000 * 100 / 10 = 10.000 ms = 10 seconds - var parallelizer = ParallelizerFactory.Create( - type: type, - workItems: Enumerable.Range(1, 1000), - workFunction: longTask, - degreeOfParallelism: 10, - totalAmount: 1000, - skip: 0); + await parallelizer.Start(); + + var cts = new CancellationTokenSource(); + cts.CancelAfter(10000); + await parallelizer.WaitCompletion(cts.Token); + + Assert.Equal(100, _progressCount); + Assert.True(_completedFlag); + Assert.Null(_lastException); + Assert.True(_lastResult); + } + + [Fact] + public async Task Run_QuickTasks_StopwatchStops() + { + const int count = 100; + var parallelizer = ParallelizerFactory.Create( + type: _type, + workItems: Enumerable.Range(1, count), + workFunction: _parityCheck, + degreeOfParallelism: 1, + totalAmount: count, + skip: 0); + + await parallelizer.Start(); + + var cts = new CancellationTokenSource(); + cts.CancelAfter(10000); + await parallelizer.WaitCompletion(cts.Token); + + var elapsed = parallelizer.Elapsed; + await Task.Delay(1000, cts.Token); + Assert.Equal(elapsed, parallelizer.Elapsed); + } + + [Fact] + public async Task Run_LongTasks_StopBeforeCompletion() + { + // In theory this should take 1000 * 100 / 10 = 10.000 ms = 10 seconds + var parallelizer = ParallelizerFactory.Create( + type: _type, + workItems: Enumerable.Range(1, 1000), + workFunction: _longTask, + degreeOfParallelism: 10, + totalAmount: 1000, + skip: 0); + + _progressCount = 0; + _completedFlag = false; + _lastException = null; + parallelizer.ProgressChanged += OnProgress; + parallelizer.Completed += OnCompleted; + parallelizer.Error += OnException; + + await parallelizer.Start(); + await Task.Delay(250); + + await parallelizer.Stop(); + + Assert.InRange(_progressCount, 10, 50); + Assert.True(_completedFlag); + Assert.Null(_lastException); + } + + [Fact] + public async Task Run_LongTasks_AbortBeforeCompletion() + { + // In theory this should take 1000 * 100 / 10 = 10.000 ms = 10 seconds + var parallelizer = ParallelizerFactory.Create( + type: _type, + workItems: Enumerable.Range(1, 1000), + workFunction: _longTask, + degreeOfParallelism: 10, + totalAmount: 1000, + skip: 0); - progressCount = 0; - completedFlag = false; - lastException = null; - parallelizer.ProgressChanged += OnProgress; - parallelizer.Completed += OnCompleted; - parallelizer.Error += OnException; - - await parallelizer.Start(); - await Task.Delay(250); - - await parallelizer.Abort(); - - var cts = new CancellationTokenSource(); - cts.CancelAfter(10000); - await parallelizer.WaitCompletion(cts.Token); - - Assert.InRange(progressCount, 10, 50); - Assert.True(completedFlag); - Assert.Null(lastException); - } - - [Fact] - public async Task Run_IncreaseConcurrentThreads_CompleteFaster() - { - var parallelizer = ParallelizerFactory.Create( - type: type, - workItems: Enumerable.Range(1, 10), - workFunction: longTask, - degreeOfParallelism: 1, - totalAmount: 10, - skip: 0); + _progressCount = 0; + _completedFlag = false; + _lastException = null; + parallelizer.ProgressChanged += OnProgress; + parallelizer.Completed += OnCompleted; + parallelizer.Error += OnException; + + await parallelizer.Start(); + await Task.Delay(250); + + await parallelizer.Abort(); + + var cts = new CancellationTokenSource(); + cts.CancelAfter(10000); + await parallelizer.WaitCompletion(cts.Token); + + Assert.InRange(_progressCount, 10, 50); + Assert.True(_completedFlag); + Assert.Null(_lastException); + } + + [Fact] + public async Task Run_IncreaseConcurrentThreads_CompleteFaster() + { + var parallelizer = ParallelizerFactory.Create( + type: _type, + workItems: Enumerable.Range(1, 10), + workFunction: _longTask, + degreeOfParallelism: 1, + totalAmount: 10, + skip: 0); - var stopwatch = new Stopwatch(); - - // Start with 1 concurrent task - stopwatch.Start(); - await parallelizer.Start(); - - // Wait for 2 rounds to fully complete - await Task.Delay(250); - - // Release 3 more slots - await parallelizer.ChangeDegreeOfParallelism(4); - - // Wait until finished - var cts = new CancellationTokenSource(); - cts.CancelAfter(10000); - await parallelizer.WaitCompletion(cts.Token); - stopwatch.Stop(); - - // Make sure it took less than 10 * 100 ms (let's say 800) - Assert.InRange(stopwatch.ElapsedMilliseconds, 0, 800); - } - - [Fact] - public async Task Run_DecreaseConcurrentThreads_CompleteSlower() - { - var parallelizer = ParallelizerFactory.Create( - type: type, - workItems: Enumerable.Range(1, 12), - workFunction: longTask, - degreeOfParallelism: 3, - totalAmount: 12, - skip: 0); - - var stopwatch = new Stopwatch(); - - // Start with 3 concurrent tasks - stopwatch.Start(); - await parallelizer.Start(); - - // Wait for 1 round to complete (a.k.a 3 completed since there are 3 concurrent threads) - await Task.Delay(150); - - // Remove 2 slots - await parallelizer.ChangeDegreeOfParallelism(1); - - // Wait until finished - var cts = new CancellationTokenSource(); - cts.CancelAfter(10000); - await parallelizer.WaitCompletion(cts.Token); - stopwatch.Stop(); - - // Make sure it took more than 12 * 100 / 3 = 400 ms (we'll say 600 to make sure) - Assert.True(stopwatch.ElapsedMilliseconds > 600); - } - - [Fact] - public async Task Run_PauseAndResume_CompleteAll() - { - var count = 10; - var parallelizer = ParallelizerFactory.Create( - type: type, - workItems: Enumerable.Range(1, count), - workFunction: longTask, - degreeOfParallelism: 1, - totalAmount: count, - skip: 0); - - progressCount = 0; - completedFlag = false; - lastException = null; - parallelizer.ProgressChanged += OnProgress; - parallelizer.NewResult += OnResult; - parallelizer.Completed += OnCompleted; - parallelizer.Error += OnException; - - await parallelizer.Start(); - await Task.Delay(150); - await parallelizer.Pause(); - - // Make sure it's actually paused and nothing is going on - var progress = progressCount; - await Task.Delay(1000); - Assert.Equal(progress, progressCount); - - await parallelizer.Resume(); - - var cts = new CancellationTokenSource(); - cts.CancelAfter(10000); - await parallelizer.WaitCompletion(cts.Token); - - Assert.Equal(count, progressCount); - Assert.True(completedFlag); - Assert.Null(lastException); - } - - [Fact] - public async Task Run_Pause_StopwatchStops() - { - var count = 10; - var parallelizer = ParallelizerFactory.Create( - type: type, - workItems: Enumerable.Range(1, count), - workFunction: longTask, - degreeOfParallelism: 1, - totalAmount: count, - skip: 0); - - await parallelizer.Start(); - await Task.Delay(150); - await parallelizer.Pause(); - - var elapsed = parallelizer.Elapsed; - await Task.Delay(1000); - Assert.Equal(elapsed, parallelizer.Elapsed); - - await parallelizer.Abort(); - } + var stopwatch = new Stopwatch(); + + // Start with 1 concurrent task + stopwatch.Start(); + await parallelizer.Start(); + + // Wait for 2 rounds to fully complete + await Task.Delay(250); + + // Release 3 more slots + await parallelizer.ChangeDegreeOfParallelism(4); + + // Wait until finished + var cts = new CancellationTokenSource(); + cts.CancelAfter(10000); + await parallelizer.WaitCompletion(cts.Token); + stopwatch.Stop(); + + // Make sure it took less than 10 * 100 ms (let's say 800) + Assert.InRange(stopwatch.ElapsedMilliseconds, 0, 800); + } + + [Fact] + public async Task Run_DecreaseConcurrentThreads_CompleteSlower() + { + var parallelizer = ParallelizerFactory.Create( + type: _type, + workItems: Enumerable.Range(1, 12), + workFunction: _longTask, + degreeOfParallelism: 3, + totalAmount: 12, + skip: 0); + + var stopwatch = new Stopwatch(); + + // Start with 3 concurrent tasks + stopwatch.Start(); + await parallelizer.Start(); + + // Wait for 1 round to complete (a.k.a 3 completed since there are 3 concurrent threads) + await Task.Delay(150); + + // Remove 2 slots + await parallelizer.ChangeDegreeOfParallelism(1); + + // Wait until finished + var cts = new CancellationTokenSource(); + cts.CancelAfter(10000); + await parallelizer.WaitCompletion(cts.Token); + stopwatch.Stop(); + + // Make sure it took more than 12 * 100 / 3 = 400 ms (we'll say 600 to make sure) + Assert.True(stopwatch.ElapsedMilliseconds > 600); + } + + [Fact] + public async Task Run_PauseAndResume_CompleteAll() + { + var count = 10; + var parallelizer = ParallelizerFactory.Create( + type: _type, + workItems: Enumerable.Range(1, count), + workFunction: _longTask, + degreeOfParallelism: 1, + totalAmount: count, + skip: 0); + + _progressCount = 0; + _completedFlag = false; + _lastException = null; + parallelizer.ProgressChanged += OnProgress; + parallelizer.NewResult += OnResult; + parallelizer.Completed += OnCompleted; + parallelizer.Error += OnException; + + await parallelizer.Start(); + await Task.Delay(150); + await parallelizer.Pause(); + + // Make sure it's actually paused and nothing is going on + var progress = _progressCount; + await Task.Delay(1000); + Assert.Equal(progress, _progressCount); + + await parallelizer.Resume(); + + var cts = new CancellationTokenSource(); + cts.CancelAfter(10000); + await parallelizer.WaitCompletion(cts.Token); + + Assert.Equal(count, _progressCount); + Assert.True(_completedFlag); + Assert.Null(_lastException); + } + + [Fact] + public async Task Run_Pause_StopwatchStops() + { + const int count = 10; + var parallelizer = ParallelizerFactory.Create( + type: _type, + workItems: Enumerable.Range(1, count), + workFunction: _longTask, + degreeOfParallelism: 1, + totalAmount: count, + skip: 0); + + await parallelizer.Start(); + await Task.Delay(150); + await parallelizer.Pause(); + + var elapsed = parallelizer.Elapsed; + await Task.Delay(1000); + Assert.Equal(elapsed, parallelizer.Elapsed); + + await parallelizer.Abort(); } } diff --git a/RuriLib.Parallelization/Exceptions/RequiredStatusException.cs b/RuriLib.Parallelization/Exceptions/RequiredStatusException.cs index 717124a38..fab757e9d 100644 --- a/RuriLib.Parallelization/Exceptions/RequiredStatusException.cs +++ b/RuriLib.Parallelization/Exceptions/RequiredStatusException.cs @@ -1,31 +1,53 @@ using System; +using System.Linq; -namespace RuriLib.Parallelization.Exceptions +namespace RuriLib.Parallelization.Exceptions; + +/// +/// Exception that is thrown when a method of a can only +/// be executed when its status has one of the specified values. +/// +public class RequiredStatusException : Exception { /// - /// Exception that is thrown when a method of a can only - /// be executed when its status has one of the specified values. + /// When the requires a status of + /// but the status was . /// - public class RequiredStatusException : Exception + private RequiredStatusException(ParallelizerStatus actualStatus, ParallelizerStatus requiredStatus) + : base($"The operation can only be performed when the Task Manager is in a {requiredStatus} status, but the status was {actualStatus}.") { - /// - /// When the requires a status of - /// but the status was . - /// - public RequiredStatusException(ParallelizerStatus requiredStatus, ParallelizerStatus actualStatus) - : base($"The operation can only be performed when the Task Manager is in a {requiredStatus} status, but the status was {actualStatus}.") - { - } + } - /// - /// When the requires a status in the array - /// but the status was . - /// - public RequiredStatusException(ParallelizerStatus[] requiredStatuses, ParallelizerStatus actualStatus) - : base($"The operation can only be performed when the Task Manager is in one of these statuses: {string.Join(", ", requiredStatuses)}, but the status was {actualStatus}.") - { + /// + /// When the requires a status in the array + /// but the status was . + /// + private RequiredStatusException(ParallelizerStatus actualStatus, ParallelizerStatus[] requiredStatuses) + : base($"The operation can only be performed when the Task Manager is in one of these statuses: {string.Join(", ", requiredStatuses)}, but the status was {actualStatus}.") + { + } + + /// + /// Throws a if the is not equal to the . + /// + public static void ThrowIfNot(ParallelizerStatus actualStatus, ParallelizerStatus requiredStatus) + { + if (actualStatus != requiredStatus) + { + throw new RequiredStatusException(actualStatus, requiredStatus); + } + } + + /// + /// Throws a if the is not one of the . + /// + public static void ThrowIfNot(ParallelizerStatus actualStatus, ParallelizerStatus[] requiredStatuses) + { + if (!requiredStatuses.Contains(actualStatus)) + { + throw new RequiredStatusException(actualStatus, requiredStatuses); } } } diff --git a/RuriLib.Parallelization/Models/ErrorDetails.cs b/RuriLib.Parallelization/Models/ErrorDetails.cs index 73035fc17..f6de15f59 100644 --- a/RuriLib.Parallelization/Models/ErrorDetails.cs +++ b/RuriLib.Parallelization/Models/ErrorDetails.cs @@ -1,27 +1,26 @@ using System; -namespace RuriLib.Parallelization.Models +namespace RuriLib.Parallelization.Models; + +/// +/// Details of an error that happened while processing a work item. +/// +/// +public class ErrorDetails { + /// The item that was being processed by the operation. + public TInput Item { get; set; } + + /// The exception thrown by the operation. + public Exception Exception { get; set; } + /// - /// Details of an error that happened while processing a work item. + /// Creates error details for a given for which the work + /// function generated a given /// - /// - public class ErrorDetails + public ErrorDetails(TInput item, Exception exception) { - /// The item that was being processed by the operation. - public TInput Item { get; set; } - - /// The exception thrown by the operation. - public Exception Exception { get; set; } - - /// - /// Creates error details for a given for which the work - /// function generated a given - /// - public ErrorDetails(TInput item, Exception exception) - { - Item = item; - Exception = exception; - } + Item = item; + Exception = exception; } } diff --git a/RuriLib.Parallelization/Models/ResultDetails.cs b/RuriLib.Parallelization/Models/ResultDetails.cs index 2cef522b2..14567b109 100644 --- a/RuriLib.Parallelization/Models/ResultDetails.cs +++ b/RuriLib.Parallelization/Models/ResultDetails.cs @@ -1,26 +1,25 @@ -namespace RuriLib.Parallelization.Models +namespace RuriLib.Parallelization.Models; + +/// +/// Details the result of the execution of the work functions on a given input. +/// +/// The type of input +/// The type of output +public class ResultDetails { + /// The item that was being processed by the operation. + public TInput Item { get; set; } + + /// The result returned by the operation. + public TOutput Result { get; set; } + /// - /// Details the result of the execution of the work functions on a given input. + /// Creates result details for a given for which + /// the work function generated a given . /// - /// The type of input - /// The type of output - public class ResultDetails + public ResultDetails(TInput item, TOutput result) { - /// The item that was being processed by the operation. - public TInput Item { get; set; } - - /// The result returned by the operation. - public TOutput Result { get; set; } - - /// - /// Creates result details for a given for which - /// the work function generated a given . - /// - public ResultDetails(TInput item, TOutput result) - { - Item = item; - Result = result; - } + Item = item; + Result = result; } } diff --git a/RuriLib.Parallelization/ParallelBasedParallelizer.cs b/RuriLib.Parallelization/ParallelBasedParallelizer.cs index 21e517e78..ce3722944 100644 --- a/RuriLib.Parallelization/ParallelBasedParallelizer.cs +++ b/RuriLib.Parallelization/ParallelBasedParallelizer.cs @@ -4,121 +4,119 @@ using System.Threading; using System.Threading.Tasks; -namespace RuriLib.Parallelization +namespace RuriLib.Parallelization; + +/// +/// Parallelizer that uses the Parallel.ForEachAsync function. +/// +public class ParallelBasedParallelizer : Parallelizer { - /// - /// Parallelizer that uses the Parallel.ForEachAsync function. - /// - public class ParallelBasedParallelizer : Parallelizer + #region Constructors + /// + public ParallelBasedParallelizer(IEnumerable workItems, Func> workFunction, + int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200) + : base(workItems, workFunction, degreeOfParallelism, totalAmount, skip, maxDegreeOfParallelism) { - #region Constructors - /// - public ParallelBasedParallelizer(IEnumerable workItems, Func> workFunction, - int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200) - : base(workItems, workFunction, degreeOfParallelism, totalAmount, skip, maxDegreeOfParallelism) - { - - } - #endregion - #region Public Methods - /// - public async override Task Start() - { - await base.Start().ConfigureAwait(false); + } + #endregion - stopwatch.Restart(); - Status = ParallelizerStatus.Running; - _ = Task.Run(() => Run()).ConfigureAwait(false); - } + #region Public Methods + /// + public override async Task Start() + { + await base.Start().ConfigureAwait(false); - /// - public async override Task Pause() - { - await base.Pause().ConfigureAwait(false); + Stopwatch.Restart(); + Status = ParallelizerStatus.Running; + _ = Task.Run(Run).ConfigureAwait(false); + } - throw new NotSupportedException("This parallelizer does not support pausing"); - } + /// + public override async Task Pause() + { + await base.Pause().ConfigureAwait(false); - /// - public async override Task Resume() - { - await base.Resume().ConfigureAwait(false); + throw new NotSupportedException("This parallelizer does not support pausing"); + } - throw new NotSupportedException("This parallelizer does not support resuming"); - } + /// + public override async Task Resume() + { + await base.Resume().ConfigureAwait(false); - /// - public async override Task Stop() - { - await base.Stop().ConfigureAwait(false); + throw new NotSupportedException("This parallelizer does not support resuming"); + } - throw new NotSupportedException("This parallelizer does not support soft stopping"); - } + /// + public override async Task Stop() + { + await base.Stop().ConfigureAwait(false); - /// - public async override Task Abort() - { - await base.Abort().ConfigureAwait(false); + throw new NotSupportedException("This parallelizer does not support soft stopping"); + } - Status = ParallelizerStatus.Stopping; - hardCTS.Cancel(); - softCTS.Cancel(); - await WaitCompletion().ConfigureAwait(false); - } + /// + public override async Task Abort() + { + await base.Abort().ConfigureAwait(false); - /// - public async override Task ChangeDegreeOfParallelism(int newValue) - { - await base.ChangeDegreeOfParallelism(newValue); + Status = ParallelizerStatus.Stopping; + await HardCts.CancelAsync(); + await SoftCts.CancelAsync(); + await WaitCompletion().ConfigureAwait(false); + } - if (Status == ParallelizerStatus.Idle) - { - degreeOfParallelism = newValue; - return; - } + /// + public override async Task ChangeDegreeOfParallelism(int newValue) + { + await base.ChangeDegreeOfParallelism(newValue); + if (Status != ParallelizerStatus.Idle) + { throw new NotSupportedException("You cannot change the DoP while this parallelizer is running"); } - #endregion + + DegreeOfParallelism = newValue; + } + #endregion - #region Private Methods - // Run is executed in fire and forget mode (not awaited) - private async void Run() - { - // Skip the items - var items = workItems.Skip(skip); + #region Private Methods + // Run is executed in fire and forget mode (not awaited) + private async void Run() + { + // Skip the items + var items = WorkItems.Skip(Skip); - try - { - var options = new ParallelOptions - { - MaxDegreeOfParallelism = degreeOfParallelism, - TaskScheduler = TaskScheduler.Default, - CancellationToken = hardCTS.Token - }; - await Parallel.ForEachAsync(items, options, async (item, token) => - { - await taskFunction(item).ConfigureAwait(false); - }); - } - catch (TaskCanceledException) - { - // Operation aborted, don't throw the error - } - catch (Exception ex) + try + { + var options = new ParallelOptions { - OnError(ex); - } - finally + MaxDegreeOfParallelism = DegreeOfParallelism, + TaskScheduler = TaskScheduler.Default, + CancellationToken = HardCts.Token + }; + await Parallel.ForEachAsync(items, options, async (item, _) => { - OnCompleted(); - Status = ParallelizerStatus.Idle; - hardCTS?.Dispose(); - softCTS?.Dispose(); - stopwatch?.Stop(); - } + await TaskFunction(item).ConfigureAwait(false); + }); + } + catch (TaskCanceledException) + { + // Operation aborted, don't throw the error + } + catch (Exception ex) + { + OnError(ex); + } + finally + { + OnCompleted(); + Status = ParallelizerStatus.Idle; + HardCts.Dispose(); + SoftCts.Dispose(); + Stopwatch.Stop(); } - #endregion } -} \ No newline at end of file + #endregion +} diff --git a/RuriLib.Parallelization/Parallelizer.cs b/RuriLib.Parallelization/Parallelizer.cs index 46d54aa9c..881970126 100644 --- a/RuriLib.Parallelization/Parallelizer.cs +++ b/RuriLib.Parallelization/Parallelizer.cs @@ -8,383 +8,414 @@ using System.Threading; using System.Threading.Tasks; -namespace RuriLib.Parallelization +namespace RuriLib.Parallelization; + +/// +/// Provides a managed way to execute parallelized work. +/// +/// The type of the workload items +/// The type of the results +public abstract class Parallelizer : IDisposable { + #region Public Fields /// - /// Provides a managed way to execute parallelized work. + /// The maximum value that the degree of parallelism can have when changed through the + /// method. /// - /// The type of the workload items - /// The type of the results - public abstract class Parallelizer - { - #region Public Fields - /// - /// The maximum value that the degree of parallelism can have when changed through the - /// method. - /// - public int MaxDegreeOfParallelism { get; set; } = 200; + public int MaxDegreeOfParallelism { get; set; } - /// - /// The current status of the parallelizer. - /// - public ParallelizerStatus Status + /// + /// The current status of the parallelizer. + /// + public ParallelizerStatus Status + { + get => _status; + protected set { - get => status; - protected set - { - status = value; - OnStatusChanged(status); - } + _status = value; + OnStatusChanged(_status); } + } - /// - /// Retrieves the current progress in the interval [0, 1]. - /// The progress is -1 if the manager hasn't been started yet. - /// - public float Progress => (float)(processed + skip) / totalAmount; + /// + /// Retrieves the current progress in the interval [0, 1]. + /// The progress is -1 if the manager hasn't been started yet. + /// + public float Progress => (float)(Processed + Skip) / TotalAmount; - /// - /// Retrieves the completed work per minute. - /// - public int CPM { get; protected set; } = 0; + /// + /// Retrieves the completed work per minute. + /// + public int CPM { get; protected set; } - /// - /// Sets a maximum threshold for CPM. 0 to disable. - /// - public int CPMLimit { get; set; } = 0; + /// + /// Sets a maximum threshold for CPM. 0 to disable. + /// + public int CPMLimit { get; set; } = 0; - /// - /// The time when the parallelizer started its work for its last running session. - /// - public DateTime StartTime { get; private set; } + /// + /// The time when the parallelizer started its work for its last running session. + /// + public DateTime StartTime { get; private set; } - /// - /// The time when the parallelizer finished its work or was stopped ( if it hasn't finished - /// a single session yet). - /// - public DateTime? EndTime { get; private set; } + /// + /// The time when the parallelizer finished its work or was stopped ( if it hasn't finished + /// a single session yet). + /// + public DateTime? EndTime { get; private set; } - /// - /// The Estimated Time of Arrival (when the parallelizer is expected to finish all the work). - /// - public DateTime ETA + /// + /// The Estimated Time of Arrival (when the parallelizer is expected to finish all the work). + /// + public DateTime ETA + { + get { - get - { - var minutes = (totalAmount * (1 - Progress)) / CPM; - return CPM > 0 && minutes < TimeSpan.MaxValue.TotalMinutes - ? StartTime + TimeSpan.FromMinutes(minutes) - : DateTime.MaxValue; - } + var minutes = TotalAmount * (1 - Progress) / CPM; + return CPM > 0 && minutes < TimeSpan.MaxValue.TotalMinutes + ? StartTime + TimeSpan.FromMinutes(minutes) + : DateTime.MaxValue; } + } - /// - /// The time elapsed since the start of the session. - /// - public TimeSpan Elapsed => TimeSpan.FromMilliseconds(stopwatch.ElapsedMilliseconds); + /// + /// The time elapsed since the start of the session. + /// + public TimeSpan Elapsed => TimeSpan.FromMilliseconds(Stopwatch.ElapsedMilliseconds); - /// - /// The expected remaining time to finish all the work. - /// - public TimeSpan Remaining => EndTime.HasValue ? TimeSpan.Zero : ETA - DateTime.Now; - #endregion + /// + /// The expected remaining time to finish all the work. + /// + public TimeSpan Remaining => EndTime.HasValue ? TimeSpan.Zero : ETA - DateTime.Now; + #endregion - #region Protected Fields + #region Protected Fields /// - /// The status of the parallelizer. - /// - protected ParallelizerStatus status = ParallelizerStatus.Idle; + /// The number of items that can be processed concurrently. + /// + protected int DegreeOfParallelism; - /// - /// The number of items that can be processed concurrently. - /// - protected int degreeOfParallelism; + /// + /// The items to process. + /// + protected readonly IEnumerable WorkItems; - /// - /// The items to process. - /// - protected readonly IEnumerable workItems; + /// + /// The function to process items and get results. + /// + protected readonly Func> WorkFunction; - /// - /// The function to process items and get results. - /// - protected readonly Func> workFunction; + /// + /// The function that turns each input item into an awaitable . + /// + protected readonly Func TaskFunction; - /// - /// The function that turns each input item into an awaitable . - /// - protected readonly Func taskFunction; + /// + /// The total amount of work items that are expected to be enumerated (for progress calculations). + /// + protected readonly long TotalAmount; - /// - /// The total amount of work items that are expected to be enumerated (for progress calculations). - /// - protected readonly long totalAmount; + /// + /// The number of items to skip from the start of the collection (to restore previously aborted sessions). + /// + protected readonly int Skip; - /// - /// The number of items to skip from the start of the collection (to restore previously aborted sessions). - /// - protected readonly int skip; + /// + /// The current amount of work items that were processed so far. + /// + protected int Processed; - /// - /// The current amount of work items that were processed so far. - /// - protected int processed = 0; + /// + /// The list of timestamps for CPM calculation. + /// + protected List CheckedTimestamps = []; - /// - /// The list of timestamps for CPM calculation. - /// - protected List checkedTimestamps = new(); + /// + /// A lock that can be used to update the CPM from a single thread at a time. + /// + protected readonly object CpmLock = new(); - /// - /// A lock that can be used to update the CPM from a single thread at a time. - /// - protected readonly object cpmLock = new(); + /// + /// The stopwatch that calculates the elapsed time. + /// + protected readonly Stopwatch Stopwatch = new(); - /// - /// The stopwatch that calculates the elapsed time. - /// - protected readonly Stopwatch stopwatch = new(); + /// + /// A soft cancellation token. Cancel this for soft AND hard abort. + /// + protected CancellationTokenSource SoftCts = new(); - /// - /// A soft cancellation token. Cancel this for soft AND hard abort. - /// - protected CancellationTokenSource softCTS; + /// + /// A hard cancellation token. Cancel this for hard abort only. + /// + protected CancellationTokenSource HardCts = new(); + #endregion + + #region Private Fields + /// + /// The status of the parallelizer. + /// + private ParallelizerStatus _status = ParallelizerStatus.Idle; + #endregion - /// - /// A hard cancellation token. Cancel this for hard abort only. - /// - protected CancellationTokenSource hardCTS; - #endregion + #region Events + /// Called when an operation throws an exception. + public event EventHandler>? TaskError; - #region Events - /// Called when an operation throws an exception. - public event EventHandler> TaskError; + /// + /// Invokes a event. + /// + protected virtual void OnTaskError(ErrorDetails input) => TaskError?.Invoke(this, input); - /// - /// Invokes a event. - /// - protected virtual void OnTaskError(ErrorDetails input) => TaskError?.Invoke(this, input); + /// Called when the itself throws an exception. + public event EventHandler? Error; - /// Called when the itself throws an exception. - public event EventHandler Error; + /// + /// Invokes a event. + /// + protected virtual void OnError(Exception ex) => Error?.Invoke(this, ex); - /// - /// Invokes a event. - /// - protected virtual void OnError(Exception ex) => Error?.Invoke(this, ex); + /// Called when an operation is completed successfully. + public event EventHandler>? NewResult; - /// Called when an operation is completed successfully. - public event EventHandler> NewResult; + /// + /// Invokes a event. + /// + protected virtual void OnNewResult(ResultDetails result) => NewResult?.Invoke(this, result); - /// - /// Invokes a event. - /// - protected virtual void OnNewResult(ResultDetails result) => NewResult?.Invoke(this, result); + /// Called when the progress changes. + public event EventHandler? ProgressChanged; - /// Called when the progress changes. - public event EventHandler ProgressChanged; + /// + /// Invokes a event. + /// + protected virtual void OnProgressChanged(float progress) => ProgressChanged?.Invoke(this, progress); - /// - /// Invokes a event. - /// - protected virtual void OnProgressChanged(float progress) => ProgressChanged?.Invoke(this, progress); + /// Called when all operations were completed successfully. + public event EventHandler? Completed; - /// Called when all operations were completed successfully. - public event EventHandler Completed; + /// + /// Invokes a event. + /// + protected virtual void OnCompleted() => Completed?.Invoke(this, EventArgs.Empty); - /// - /// Invokes a event. - /// - protected virtual void OnCompleted() => Completed?.Invoke(this, EventArgs.Empty); + /// Called when changes. + public event EventHandler? StatusChanged; - /// Called when changes. - public event EventHandler StatusChanged; + /// + /// Invokes a event. + /// + protected virtual void OnStatusChanged(ParallelizerStatus newStatus) => StatusChanged?.Invoke(this, newStatus); + #endregion - /// - /// Invokes a event. - /// - protected virtual void OnStatusChanged(ParallelizerStatus newStatus) => StatusChanged?.Invoke(this, newStatus); - #endregion + #region Constructors + /// + /// Creates a new instance of . + /// + /// The collection of data to process in parallel + /// The work function that must be executed on the data + /// The amount of concurrent tasks that can be started + /// The total amount of data that is expected from + /// The amount of to skip at the beginning + /// The maximum degree of parallelism that can be set + protected Parallelizer(IEnumerable workItems, Func> workFunction, + int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200) + { + if (degreeOfParallelism < 1) + { + throw new ArgumentException("The degree of parallelism must be greater than 1"); + } - #region Constructors - /// - /// Creates a new instance of . - /// - /// The collection of data to process in parallel - /// The work function that must be executed on the data - /// The amount of concurrent tasks that can be started - /// The total amount of data that is expected from - /// The amount of to skip at the beginning - /// The maximum degree of parallelism that can be set - public Parallelizer(IEnumerable workItems, Func> workFunction, - int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200) + if (degreeOfParallelism > maxDegreeOfParallelism) { - if (degreeOfParallelism < 1) - throw new ArgumentException("The degree of parallelism must be greater than 1"); + throw new ArgumentException("The degree of parallelism must not be greater than the maximum degree of parallelism"); + } - if (degreeOfParallelism > maxDegreeOfParallelism) - throw new ArgumentException("The degree of parallelism must not be greater than the maximum degree of parallelism"); + if (skip >= totalAmount) + { + throw new ArgumentException("The skip must be less than the total amount"); + } - if (skip >= totalAmount) - throw new ArgumentException("The skip must be less than the total amount"); + this.WorkItems = workItems ?? throw new ArgumentNullException(nameof(workItems)); + this.WorkFunction = workFunction ?? throw new ArgumentNullException(nameof(workFunction)); + this.TotalAmount = totalAmount; + this.DegreeOfParallelism = degreeOfParallelism; + this.Skip = skip; + MaxDegreeOfParallelism = maxDegreeOfParallelism; - this.workItems = workItems ?? throw new ArgumentNullException(nameof(workItems)); - this.workFunction = workFunction ?? throw new ArgumentNullException(nameof(workFunction)); - this.totalAmount = totalAmount; - this.degreeOfParallelism = degreeOfParallelism; - this.skip = skip; - MaxDegreeOfParallelism = maxDegreeOfParallelism; + // Assign the task function + TaskFunction = async item => + { + if (SoftCts.IsCancellationRequested) + { + return; + } - // Assign the task function - taskFunction = new Func(async item => + // Try to execute the work and report the result + try { - if (softCTS.IsCancellationRequested) - return; - - // Try to execute the work and report the result - try - { - var workResult = await workFunction.Invoke(item, hardCTS.Token).ConfigureAwait(false); - OnNewResult(new ResultDetails(item, workResult)); - hardCTS.Token.ThrowIfCancellationRequested(); - } - // Catch and report any exceptions - catch (Exception ex) - { - OnTaskError(new ErrorDetails(item, ex)); - } - // Report the progress, update the CPM and release the semaphore slot - finally - { - Interlocked.Increment(ref processed); - OnProgressChanged(Progress); - - checkedTimestamps.Add(Environment.TickCount); - UpdateCPM(); - } - }); - } - #endregion + var workResult = await workFunction.Invoke(item, HardCts.Token).ConfigureAwait(false); + OnNewResult(new ResultDetails(item, workResult)); + HardCts.Token.ThrowIfCancellationRequested(); + } + // Catch and report any exceptions + catch (Exception ex) + { + OnTaskError(new ErrorDetails(item, ex)); + } + // Report the progress, update the CPM and release the semaphore slot + finally + { + Interlocked.Increment(ref Processed); + OnProgressChanged(Progress); - #region Public Methods - /// - /// Starts the execution (without waiting for completion). - /// - public virtual Task Start() - { - if (Status != ParallelizerStatus.Idle) - throw new RequiredStatusException(ParallelizerStatus.Idle, Status); + CheckedTimestamps.Add(Environment.TickCount); + UpdateCpm(); + } + }; + } + #endregion - StartTime = DateTime.Now; - EndTime = null; - checkedTimestamps.Clear(); + #region Public Methods + /// + /// Starts the execution (without waiting for completion). + /// + public virtual Task Start() + { + RequiredStatusException.ThrowIfNot(Status, ParallelizerStatus.Idle); - softCTS = new CancellationTokenSource(); - hardCTS = new CancellationTokenSource(); + Reset(); - return Task.CompletedTask; - } + return Task.CompletedTask; + } - /// Pauses the execution (waits until the ongoing operations are completed). - public virtual Task Pause() - { - if (Status != ParallelizerStatus.Running) - throw new RequiredStatusException(ParallelizerStatus.Running, Status); + /// Pauses the execution (waits until the ongoing operations are completed). + public virtual Task Pause() + { + RequiredStatusException.ThrowIfNot(Status, ParallelizerStatus.Running); - return Task.CompletedTask; - } + return Task.CompletedTask; + } - /// Resumes a paused execution. - public virtual Task Resume() - { - if (Status != ParallelizerStatus.Paused) - throw new RequiredStatusException(ParallelizerStatus.Paused, Status); + /// Resumes a paused execution. + public virtual Task Resume() + { + RequiredStatusException.ThrowIfNot(Status, ParallelizerStatus.Paused); - return Task.CompletedTask; - } + return Task.CompletedTask; + } - /// - /// Stops the execution (waits for the current items to finish). - /// - public virtual Task Stop() - { - if (Status != ParallelizerStatus.Running && Status != ParallelizerStatus.Paused) - throw new RequiredStatusException(new ParallelizerStatus[] { ParallelizerStatus.Running, ParallelizerStatus.Paused }, Status); + /// + /// Stops the execution (waits for the current items to finish). + /// + public virtual Task Stop() + { + RequiredStatusException.ThrowIfNot(Status, + [ + ParallelizerStatus.Running, + ParallelizerStatus.Paused + ]); - EndTime = DateTime.Now; + EndTime = DateTime.Now; - return Task.CompletedTask; - } + return Task.CompletedTask; + } - /// - /// Aborts the execution without waiting for the current work to finish. - /// - public virtual Task Abort() - { - if (Status != ParallelizerStatus.Running && Status != ParallelizerStatus.Paused && Status != ParallelizerStatus.Stopping - && Status != ParallelizerStatus.Pausing) - throw new RequiredStatusException(new ParallelizerStatus[] - { ParallelizerStatus.Running, ParallelizerStatus.Paused, ParallelizerStatus.Stopping, ParallelizerStatus.Pausing}, - Status); + /// + /// Aborts the execution without waiting for the current work to finish. + /// + public virtual Task Abort() + { + RequiredStatusException.ThrowIfNot(Status, + [ + ParallelizerStatus.Running, + ParallelizerStatus.Paused, + ParallelizerStatus.Stopping, + ParallelizerStatus.Pausing + ]); - EndTime = DateTime.Now; + EndTime = DateTime.Now; - return Task.CompletedTask; - } + return Task.CompletedTask; + } - /// - /// Dynamically changes the degree of parallelism. - /// - public virtual Task ChangeDegreeOfParallelism(int newValue) + /// + /// Dynamically changes the degree of parallelism. + /// + public virtual Task ChangeDegreeOfParallelism(int newValue) + { + // This can be 0 because we can use 0 dop as a pausing system + if (newValue < 0 || newValue > MaxDegreeOfParallelism) { - // This can be 0 because we can use 0 dop as a pausing system - if (newValue < 0 || newValue > MaxDegreeOfParallelism) - throw new ArgumentException($"Must be within 0 and {MaxDegreeOfParallelism}", nameof(newValue)); - - return Task.CompletedTask; + throw new ArgumentException($"Must be within 0 and {MaxDegreeOfParallelism}", nameof(newValue)); } - /// - /// An awaitable handler that completes when the is . - /// - public async Task WaitCompletion(CancellationToken cancellationToken = default) + return Task.CompletedTask; + } + + /// + /// An awaitable handler that completes when the is . + /// + public async Task WaitCompletion(CancellationToken cancellationToken = default) + { + while (Status != ParallelizerStatus.Idle) { - while (Status != ParallelizerStatus.Idle) - { - cancellationToken.ThrowIfCancellationRequested(); - await Task.Delay(100, cancellationToken).ConfigureAwait(false); - } + cancellationToken.ThrowIfCancellationRequested(); + await Task.Delay(100, cancellationToken).ConfigureAwait(false); } - #endregion + } + #endregion - #region Protected Methods - /// - /// Whether the CPM is limited to a certain amount (for throttling purposes). - /// - /// - protected bool IsCPMLimited() => CPMLimit > 0 && CPM > CPMLimit; + #region Protected Methods + /// + /// Whether the CPM is limited to a certain amount (for throttling purposes). + /// + /// + protected bool IsCpmLimited() => CPMLimit > 0 && CPM > CPMLimit; - /// - /// Updates the CPM (safe to be called from multiple threads). - /// - [MethodImpl(MethodImplOptions.AggressiveOptimization)] - protected void UpdateCPM() + /// + /// Updates the CPM (safe to be called from multiple threads). + /// + [MethodImpl(MethodImplOptions.AggressiveOptimization)] + protected void UpdateCpm() + { + // Update CPM (only 1 thread can enter) + if (!Monitor.TryEnter(CpmLock)) { - // Update CPM (only 1 thread can enter) - if (Monitor.TryEnter(cpmLock)) - { - try - { - var now = DateTime.Now; - checkedTimestamps = checkedTimestamps.Where(t => Environment.TickCount - t < 60000).ToList(); - CPM = checkedTimestamps.Count; - } - finally - { - Monitor.Exit(cpmLock); - } - } + return; } - #endregion + + try + { + CheckedTimestamps = CheckedTimestamps.Where(t => Environment.TickCount - t < 60000).ToList(); + CPM = CheckedTimestamps.Count; + } + finally + { + Monitor.Exit(CpmLock); + } + } + #endregion + + #region Private Methods + private void Reset() + { + StartTime = DateTime.Now; + EndTime = null; + CheckedTimestamps.Clear(); + + SoftCts.Dispose(); + HardCts.Dispose(); + SoftCts = new CancellationTokenSource(); + HardCts = new CancellationTokenSource(); + } + #endregion + + /// + public void Dispose() + { + GC.SuppressFinalize(this); + SoftCts.Dispose(); + HardCts.Dispose(); } } diff --git a/RuriLib.Parallelization/ParallelizerFactory.cs b/RuriLib.Parallelization/ParallelizerFactory.cs index acbe630bf..8992b35d2 100644 --- a/RuriLib.Parallelization/ParallelizerFactory.cs +++ b/RuriLib.Parallelization/ParallelizerFactory.cs @@ -3,42 +3,41 @@ using System.Threading; using System.Threading.Tasks; -namespace RuriLib.Parallelization +namespace RuriLib.Parallelization; + +/// +/// Collection of factory methods to help create parallelizers. +/// +/// The type of input that the parallelizer accepts +/// The type of output that the parallelizer produces +public static class ParallelizerFactory { /// - /// Collection of factory methods to help create parallelizers. + /// Creates a parallelizer from the given settings. /// - /// The type of input that the parallelizer accepts - /// The type of output that the parallelizer produces - public static class ParallelizerFactory + /// The type of parallelizer to use + /// The collection of items that need to be processed in parallel + /// The work function that asynchronously processes each item and produces an output + /// The maximum number of items that can be processed concurrently + /// The total amount of items that are expected to be enumerated (for Progress purposes) + /// The amount of items to skip from the beginning (to restore previously aborted sessions) + /// The maximum value that can assume when it is + /// changed with the method + public static Parallelizer Create(ParallelizerType type, + IEnumerable workItems, Func> workFunction, + int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200) { - /// - /// Creates a parallelizer from the given settings. - /// - /// The type of parallelizer to use - /// The collection of items that need to be processed in parallel - /// The work function that asynchronously processes each item and produces an output - /// The maximum number of items that can be processed concurrently - /// The total amount of items that are expected to be enumerated (for Progress purposes) - /// The amount of items to skip from the beginning (to restore previously aborted sessions) - /// The maximum value that can assume when it is - /// changed with the method - public static Parallelizer Create(ParallelizerType type, - IEnumerable workItems, Func> workFunction, - int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200) + var pType = type switch { - var pType = type switch - { - ParallelizerType.TaskBased => typeof(TaskBasedParallelizer), - ParallelizerType.ThreadBased => typeof(ThreadBasedParallelizer), - ParallelizerType.ParallelBased => typeof(ParallelBasedParallelizer), - _ => throw new NotImplementedException() - }; + ParallelizerType.TaskBased => typeof(TaskBasedParallelizer), + ParallelizerType.ThreadBased => typeof(ThreadBasedParallelizer), + ParallelizerType.ParallelBased => typeof(ParallelBasedParallelizer), + _ => throw new NotImplementedException() + }; - var instance = Activator.CreateInstance(pType, workItems, workFunction, degreeOfParallelism, - totalAmount, skip, maxDegreeOfParallelism); + var instance = Activator.CreateInstance(pType, workItems, workFunction, degreeOfParallelism, + totalAmount, skip, maxDegreeOfParallelism); - return instance as Parallelizer; - } + return (instance as Parallelizer)!; } } diff --git a/RuriLib.Parallelization/ParallelizerStatus.cs b/RuriLib.Parallelization/ParallelizerStatus.cs index 374f2a9b4..062ff2e4a 100644 --- a/RuriLib.Parallelization/ParallelizerStatus.cs +++ b/RuriLib.Parallelization/ParallelizerStatus.cs @@ -1,43 +1,42 @@ -namespace RuriLib.Parallelization +namespace RuriLib.Parallelization; + +/// +/// The status of the parallelizer. +/// +public enum ParallelizerStatus { /// - /// The status of the parallelizer. + /// The parallelizer has not started yet. /// - public enum ParallelizerStatus - { - /// - /// The parallelizer has not started yet. - /// - Idle, + Idle, - /// - /// The parallelizer is starting up. - /// - Starting, + /// + /// The parallelizer is starting up. + /// + Starting, - /// - /// The parallelizer is processing the workload. - /// - Running, + /// + /// The parallelizer is processing the workload. + /// + Running, - /// - /// The parallelizer is pausing the workload. - /// - Pausing, + /// + /// The parallelizer is pausing the workload. + /// + Pausing, - /// - /// The parallelizer is paused. - /// - Paused, + /// + /// The parallelizer is paused. + /// + Paused, - /// - /// The parallelizer is stopping the workload. - /// - Stopping, + /// + /// The parallelizer is stopping the workload. + /// + Stopping, - /// - /// The parallelizer is recovering from a paused state. - /// - Resuming - } + /// + /// The parallelizer is recovering from a paused state. + /// + Resuming } diff --git a/RuriLib.Parallelization/ParallelizerType.cs b/RuriLib.Parallelization/ParallelizerType.cs index 470cea087..218119a23 100644 --- a/RuriLib.Parallelization/ParallelizerType.cs +++ b/RuriLib.Parallelization/ParallelizerType.cs @@ -1,23 +1,22 @@ -namespace RuriLib.Parallelization +namespace RuriLib.Parallelization; + +/// +/// The types of parallelizing techniques available. +/// +public enum ParallelizerType { /// - /// The types of parallelizing techniques available. + /// Uses tasks to parallelize work. /// - public enum ParallelizerType - { - /// - /// Uses tasks to parallelize work. - /// - TaskBased, + TaskBased, - /// - /// Uses threads to parallelize work. - /// - ThreadBased, + /// + /// Uses threads to parallelize work. + /// + ThreadBased, - /// - /// Uses Parallel.ForEachAsync to parallelize work. - /// - ParallelBased - } + /// + /// Uses Parallel.ForEachAsync to parallelize work. + /// + ParallelBased } diff --git a/RuriLib.Parallelization/RuriLib.Parallelization.csproj b/RuriLib.Parallelization/RuriLib.Parallelization.csproj index f71b3f118..b06e1c1a7 100644 --- a/RuriLib.Parallelization/RuriLib.Parallelization.csproj +++ b/RuriLib.Parallelization/RuriLib.Parallelization.csproj @@ -9,6 +9,7 @@ https://github.com/openbullet/OpenBullet2/tree/master/RuriLib.Parallelization parallelization; task; tasks; manager; taskmanager; multithreading; parallel; multithread; async; 1.0.6 + enable True diff --git a/RuriLib.Parallelization/TaskBasedParallelizer.cs b/RuriLib.Parallelization/TaskBasedParallelizer.cs index d7055ce2b..26473ce56 100644 --- a/RuriLib.Parallelization/TaskBasedParallelizer.cs +++ b/RuriLib.Parallelization/TaskBasedParallelizer.cs @@ -5,217 +5,226 @@ using System.Threading; using System.Threading.Tasks; -namespace RuriLib.Parallelization +namespace RuriLib.Parallelization; + +/// +/// Parallelizer that expoits batches of multiple tasks and the WaitAll function. +/// +public class TaskBasedParallelizer : Parallelizer { - /// - /// Parallelizer that expoits batches of multiple tasks and the WaitAll function. - /// - public class TaskBasedParallelizer : Parallelizer + #region Private Fields + private int BatchSize => MaxDegreeOfParallelism * 2; + private SemaphoreSlim? _semaphore; + private readonly ConcurrentQueue _queue = new(); + private int _savedDop; + private bool _dopDecreaseRequested; + #endregion + + #region Constructors + /// + public TaskBasedParallelizer(IEnumerable workItems, Func> workFunction, + int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200) + : base(workItems, workFunction, degreeOfParallelism, totalAmount, skip, maxDegreeOfParallelism) { - #region Private Fields - private int BatchSize => MaxDegreeOfParallelism * 2; - private SemaphoreSlim semaphore; - private ConcurrentQueue queue; - private int savedDOP; - private bool dopDecreaseRequested; - #endregion - - #region Constructors - /// - public TaskBasedParallelizer(IEnumerable workItems, Func> workFunction, - int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200) - : base(workItems, workFunction, degreeOfParallelism, totalAmount, skip, maxDegreeOfParallelism) - { - } - #endregion + } + #endregion - #region Public Methods - /// - public async override Task Start() - { - await base.Start().ConfigureAwait(false); + #region Public Methods + /// + public override async Task Start() + { + await base.Start().ConfigureAwait(false); - stopwatch.Restart(); - Status = ParallelizerStatus.Running; - _ = Task.Run(() => Run()).ConfigureAwait(false); - } + Stopwatch.Restart(); + Status = ParallelizerStatus.Running; + _ = Task.Run(Run).ConfigureAwait(false); + } - /// - public async override Task Pause() - { - await base.Pause().ConfigureAwait(false); + /// + public override async Task Pause() + { + await base.Pause().ConfigureAwait(false); - Status = ParallelizerStatus.Pausing; - savedDOP = degreeOfParallelism; - await ChangeDegreeOfParallelism(0).ConfigureAwait(false); - Status = ParallelizerStatus.Paused; - stopwatch.Stop(); - } + Status = ParallelizerStatus.Pausing; + _savedDop = DegreeOfParallelism; + await ChangeDegreeOfParallelism(0).ConfigureAwait(false); + Status = ParallelizerStatus.Paused; + Stopwatch.Stop(); + } - /// - public async override Task Resume() - { - await base.Resume().ConfigureAwait(false); + /// + public override async Task Resume() + { + await base.Resume().ConfigureAwait(false); - Status = ParallelizerStatus.Resuming; - await ChangeDegreeOfParallelism(savedDOP).ConfigureAwait(false); - Status = ParallelizerStatus.Running; - stopwatch.Start(); - } + Status = ParallelizerStatus.Resuming; + await ChangeDegreeOfParallelism(_savedDop).ConfigureAwait(false); + Status = ParallelizerStatus.Running; + Stopwatch.Start(); + } - /// - public async override Task Stop() - { - await base.Stop().ConfigureAwait(false); + /// + public override async Task Stop() + { + await base.Stop().ConfigureAwait(false); - Status = ParallelizerStatus.Stopping; - softCTS.Cancel(); - await WaitCompletion().ConfigureAwait(false); - } + Status = ParallelizerStatus.Stopping; + await SoftCts.CancelAsync(); + await WaitCompletion().ConfigureAwait(false); + } - /// - public async override Task Abort() - { - await base.Abort().ConfigureAwait(false); + /// + public override async Task Abort() + { + await base.Abort().ConfigureAwait(false); - Status = ParallelizerStatus.Stopping; - hardCTS.Cancel(); - softCTS.Cancel(); - await WaitCompletion().ConfigureAwait(false); - } + Status = ParallelizerStatus.Stopping; + await HardCts.CancelAsync(); + await SoftCts.CancelAsync(); + await WaitCompletion().ConfigureAwait(false); + } - /// - public async override Task ChangeDegreeOfParallelism(int newValue) - { - await base.ChangeDegreeOfParallelism(newValue); + /// + public override async Task ChangeDegreeOfParallelism(int newValue) + { + await base.ChangeDegreeOfParallelism(newValue); - if (Status == ParallelizerStatus.Idle) - { - degreeOfParallelism = newValue; + switch (Status) + { + case ParallelizerStatus.Idle: + DegreeOfParallelism = newValue; return; - } - else if (Status == ParallelizerStatus.Paused) - { - savedDOP = newValue; + + case ParallelizerStatus.Paused: + _savedDop = newValue; return; - } + } - if (newValue == degreeOfParallelism) - { - return; - } - else if (newValue > degreeOfParallelism) - { - semaphore.Release(newValue - degreeOfParallelism); - } - else + if (newValue == DegreeOfParallelism) + { + return; + } + + if (_semaphore is null) + { + DegreeOfParallelism = newValue; + return; + } + + if (newValue > DegreeOfParallelism) + { + _semaphore.Release(newValue - DegreeOfParallelism); + } + else + { + _dopDecreaseRequested = true; + for (var i = 0; i < DegreeOfParallelism - newValue; ++i) { - dopDecreaseRequested = true; - for (var i = 0; i < degreeOfParallelism - newValue; ++i) - { - await semaphore.WaitAsync().ConfigureAwait(false); - } - dopDecreaseRequested = false; + await _semaphore.WaitAsync().ConfigureAwait(false); } - - degreeOfParallelism = newValue; + _dopDecreaseRequested = false; } - #endregion - #region Private Methods - // Run is executed in fire and forget mode (not awaited) - private async void Run() - { - semaphore = new SemaphoreSlim(degreeOfParallelism, MaxDegreeOfParallelism); - dopDecreaseRequested = false; + DegreeOfParallelism = newValue; + } + #endregion - // Skip the items - using var items = workItems.Skip(skip).GetEnumerator(); + #region Private Methods + // Run is executed in fire and forget mode (not awaited) + private async void Run() + { + _dopDecreaseRequested = false; - // Create the queue - queue = new ConcurrentQueue(); + // Skip the items + using var items = WorkItems.Skip(Skip).GetEnumerator(); - // Enqueue the first batch (at most BatchSize items) - while (queue.Count < BatchSize && items.MoveNext()) - { - queue.Enqueue(items.Current); - } + // Clear the queue + _queue.Clear(); - try - { - // While there are items in the queue and we didn't cancel, dequeue one, wait and then - // queue another task if there are more to queue - while (!queue.IsEmpty && !softCTS.IsCancellationRequested) - { - WAIT: + // Enqueue the first batch (at most BatchSize items) + while (_queue.Count < BatchSize && items.MoveNext()) + { + _queue.Enqueue(items.Current); + } - // Wait for the semaphore - await semaphore.WaitAsync(softCTS.Token).ConfigureAwait(false); + _semaphore = new SemaphoreSlim(DegreeOfParallelism, MaxDegreeOfParallelism); + + try + { + // While there are items in the queue, and we didn't cancel, dequeue one, wait and then + // queue another task if there are more to queue + while (!_queue.IsEmpty && !SoftCts.IsCancellationRequested) + { + WAIT: - if (softCTS.IsCancellationRequested) - break; + // Wait for the semaphore + await _semaphore!.WaitAsync(SoftCts.Token).ConfigureAwait(false); - if (dopDecreaseRequested || IsCPMLimited()) - { - UpdateCPM(); - semaphore?.Release(); - goto WAIT; - } + if (SoftCts.IsCancellationRequested) + break; - // If the current batch is running out - if (queue.Count < MaxDegreeOfParallelism) - { - // Queue more items until the BatchSize is reached OR until the enumeration finished - while (queue.Count < BatchSize && items.MoveNext()) - { - queue.Enqueue(items.Current); - } - } + if (_dopDecreaseRequested || IsCpmLimited()) + { + UpdateCpm(); + _semaphore!.Release(); + goto WAIT; + } - // If we can dequeue an item, run it - if (queue.TryDequeue(out TInput item)) - { - // The task will release its slot no matter what - _ = taskFunction.Invoke(item) - .ContinueWith(_ => semaphore?.Release()) - .ConfigureAwait(false); - } - else + // If the current batch is running out + if (_queue.Count < MaxDegreeOfParallelism) + { + // Queue more items until the BatchSize is reached OR until the enumeration finished + while (_queue.Count < BatchSize && items.MoveNext()) { - semaphore?.Release(); + _queue.Enqueue(items.Current); } } - // Wait for every remaining task from the last batch to finish unless aborted - while (Progress < 1 && !hardCTS.IsCancellationRequested) + // If we can dequeue an item, run it + if (_queue.TryDequeue(out var item)) { - await Task.Delay(100).ConfigureAwait(false); + // The task will release its slot no matter what + _ = TaskFunction.Invoke(item) + // ReSharper disable once AccessToDisposedClosure + // (the semaphore is only disposed after the loop finishes) + .ContinueWith(_ => _semaphore?.Release()) + .ConfigureAwait(false); } - } - catch (OperationCanceledException) - { - // Wait for current tasks to finish unless aborted - while (semaphore.CurrentCount < degreeOfParallelism && !hardCTS.IsCancellationRequested) + else { - await Task.Delay(100).ConfigureAwait(false); + _semaphore?.Release(); } } - catch (Exception ex) + + // Wait for every remaining task from the last batch to finish unless aborted + while (Progress < 1 && !HardCts.IsCancellationRequested) { - OnError(ex); + await Task.Delay(100).ConfigureAwait(false); } - finally + } + catch (OperationCanceledException) + { + // Wait for current tasks to finish unless aborted + while (_semaphore!.CurrentCount < DegreeOfParallelism && !HardCts.IsCancellationRequested) { - OnCompleted(); - Status = ParallelizerStatus.Idle; - hardCTS?.Dispose(); - softCTS?.Dispose(); - semaphore?.Dispose(); - semaphore = null; - stopwatch?.Stop(); + await Task.Delay(100).ConfigureAwait(false); } } - #endregion + catch (Exception ex) + { + OnError(ex); + } + finally + { + OnCompleted(); + Status = ParallelizerStatus.Idle; + HardCts.Dispose(); + SoftCts.Dispose(); + _semaphore?.Dispose(); + _semaphore = null; + Stopwatch.Stop(); + } } -} \ No newline at end of file + #endregion +} diff --git a/RuriLib.Parallelization/ThreadBasedParallelizer.cs b/RuriLib.Parallelization/ThreadBasedParallelizer.cs index 05a1422ec..0296006ab 100644 --- a/RuriLib.Parallelization/ThreadBasedParallelizer.cs +++ b/RuriLib.Parallelization/ThreadBasedParallelizer.cs @@ -4,188 +4,190 @@ using System.Threading; using System.Threading.Tasks; -namespace RuriLib.Parallelization +namespace RuriLib.Parallelization; + +/// +/// Parallelizer that expoits a custom pool of threads. +/// +public class ThreadBasedParallelizer : Parallelizer { - /// - /// Parallelizer that expoits a custom pool of threads. - /// - public class ThreadBasedParallelizer : Parallelizer + #region Private Fields + private readonly List _threadPool = []; + #endregion + + #region Constructors + /// + public ThreadBasedParallelizer(IEnumerable workItems, Func> workFunction, + int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200) + : base(workItems, workFunction, degreeOfParallelism, totalAmount, skip, maxDegreeOfParallelism) { - #region Private Fields - private readonly List threadPool = new(); - #endregion - - #region Constructors - /// - public ThreadBasedParallelizer(IEnumerable workItems, Func> workFunction, - int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200) - : base(workItems, workFunction, degreeOfParallelism, totalAmount, skip, maxDegreeOfParallelism) - { - } - #endregion + } + #endregion - #region Public Methods - /// - public async override Task Start() - { - await base.Start(); + #region Public Methods + /// + public override async Task Start() + { + await base.Start(); - stopwatch.Restart(); - Status = ParallelizerStatus.Running; - _ = Task.Run(() => Run()).ConfigureAwait(false); - } + Stopwatch.Restart(); + Status = ParallelizerStatus.Running; + _ = Task.Run(Run).ConfigureAwait(false); + } - /// - public async override Task Pause() - { - await base.Pause(); + /// + public override async Task Pause() + { + await base.Pause(); - Status = ParallelizerStatus.Pausing; - await WaitCurrentWorkCompletion(); - Status = ParallelizerStatus.Paused; - stopwatch.Stop(); - } + Status = ParallelizerStatus.Pausing; + await WaitCurrentWorkCompletion(); + Status = ParallelizerStatus.Paused; + Stopwatch.Stop(); + } - /// - public async override Task Resume() - { - await base.Resume(); + /// + public override async Task Resume() + { + await base.Resume(); - Status = ParallelizerStatus.Running; - stopwatch.Start(); - } + Status = ParallelizerStatus.Running; + Stopwatch.Start(); + } - /// - public async override Task Stop() - { - await base.Stop(); + /// + public override async Task Stop() + { + await base.Stop(); - Status = ParallelizerStatus.Stopping; - softCTS.Cancel(); - await WaitCompletion().ConfigureAwait(false); - stopwatch.Stop(); - } + Status = ParallelizerStatus.Stopping; + await SoftCts.CancelAsync(); + await WaitCompletion().ConfigureAwait(false); + Stopwatch.Stop(); + } - /// - public async override Task Abort() - { - await base.Abort(); + /// + public override async Task Abort() + { + await base.Abort(); - Status = ParallelizerStatus.Stopping; - hardCTS.Cancel(); - softCTS.Cancel(); - await WaitCompletion().ConfigureAwait(false); - stopwatch.Stop(); - } + Status = ParallelizerStatus.Stopping; + await HardCts.CancelAsync(); + await SoftCts.CancelAsync(); + await WaitCompletion().ConfigureAwait(false); + Stopwatch.Stop(); + } - /// - public async override Task ChangeDegreeOfParallelism(int newValue) - { - await base.ChangeDegreeOfParallelism(newValue); + /// + public override async Task ChangeDegreeOfParallelism(int newValue) + { + await base.ChangeDegreeOfParallelism(newValue); - degreeOfParallelism = newValue; - } - #endregion + DegreeOfParallelism = newValue; + } + #endregion + + #region Private Methods + // Run is executed in fire and forget mode (not awaited) + private async void Run() + { + // Skip the items + using var items = WorkItems.Skip(Skip).GetEnumerator(); - #region Private Methods - // Run is executed in fire and forget mode (not awaited) - private async void Run() + while (items.MoveNext()) { - // Skip the items - using var items = workItems.Skip(skip).GetEnumerator(); + WAIT: - while (items.MoveNext()) + // If we paused, stay idle + if (Status is ParallelizerStatus.Pausing or ParallelizerStatus.Paused) { - WAIT: + await Task.Delay(1000); + goto WAIT; + } + + // If we canceled the loop + if (SoftCts.IsCancellationRequested) + { + break; + } - // If we paused, stay idle - if (Status == ParallelizerStatus.Pausing || Status == ParallelizerStatus.Paused) + // If we haven't filled the thread pool yet, start a new thread + // (e.g. if we're at the beginning or the increased the DOP) + if (_threadPool.Count < DegreeOfParallelism) + { + StartNewThread(items.Current); + } + // Otherwise if we already filled the thread pool + else + { + // If we exceeded the CPM threshold, update CPM and go back to waiting + if (IsCpmLimited()) { - await Task.Delay(1000); + UpdateCpm(); + await Task.Delay(100); goto WAIT; } - // If we canceled the loop - if (softCTS.IsCancellationRequested) + // Search for the first idle thread + var firstFree = _threadPool.FirstOrDefault(t => !t.IsAlive); + + // If there is none, go back to waiting + if (firstFree == null) { - break; + await Task.Delay(100); + goto WAIT; } - // If we haven't filled the thread pool yet, start a new thread - // (e.g. if we're at the beginning or the increased the DOP) - if (threadPool.Count < degreeOfParallelism) + // Otherwise remove it + _threadPool.Remove(firstFree); + + // If there's space for a new thread, start it + if (_threadPool.Count < DegreeOfParallelism) { StartNewThread(items.Current); } - // Otherwise if we already filled the thread pool + // Otherwise go back to waiting else { - // If we exceeded the CPM threshold, update CPM and go back to waiting - if (IsCPMLimited()) - { - UpdateCPM(); - await Task.Delay(100); - goto WAIT; - } - - // Search for the first idle thread - var firstFree = threadPool.FirstOrDefault(t => !t.IsAlive); - - // If there is none, go back to waiting - if (firstFree == null) - { - await Task.Delay(100); - goto WAIT; - } - - // Otherwise remove it - threadPool.Remove(firstFree); - - // If there's space for a new thread, start it - if (threadPool.Count < degreeOfParallelism) - { - StartNewThread(items.Current); - } - // Otherwise go back to waiting - else - { - await Task.Delay(100); - goto WAIT; - } + await Task.Delay(100); + goto WAIT; } } + } - // Wait until ongoing threads finish - await WaitCurrentWorkCompletion(); + // Wait until ongoing threads finish + await WaitCurrentWorkCompletion(); - OnCompleted(); - Status = ParallelizerStatus.Idle; - hardCTS.Dispose(); - softCTS.Dispose(); - stopwatch.Stop(); - } + OnCompleted(); + Status = ParallelizerStatus.Idle; + HardCts.Dispose(); + SoftCts.Dispose(); + Stopwatch.Stop(); + } - // Creates and starts a thread, given a work item - private void StartNewThread(TInput item) - { - var thread = new Thread(new ParameterizedThreadStart(ThreadWork)); - threadPool.Add(thread); - thread.Start(item); - } + // Creates and starts a thread, given a work item + private void StartNewThread(TInput item) + { + var thread = new Thread(ThreadWork); + _threadPool.Add(thread); + thread.Start(item); + } - // Sync method to be passed to a thread - private void ThreadWork(object input) - => taskFunction((TInput)input).Wait(); + // Sync method to be passed to a thread + private void ThreadWork(object? input) + { + ArgumentNullException.ThrowIfNull(input); + TaskFunction((TInput)input).Wait(); + } - // Wait until the current round is over (if we didn't cancel, it's the last one) - private async Task WaitCurrentWorkCompletion() + // Wait until the current round is over (if we didn't cancel, it's the last one) + private async Task WaitCurrentWorkCompletion() + { + while (_threadPool.Any(t => t.IsAlive)) { - while (threadPool.Any(t => t.IsAlive)) - { - await Task.Delay(100); - } + await Task.Delay(100); } - #endregion } + #endregion }