Skip to content

Commit

Permalink
Refactored RuriLib.Parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
openbullet committed Sep 7, 2024
1 parent 8097d9f commit edbd635
Show file tree
Hide file tree
Showing 12 changed files with 1,169 additions and 1,109 deletions.
524 changes: 263 additions & 261 deletions RuriLib.Parallelization.Tests/ParallelizerTests.cs

Large diffs are not rendered by default.

60 changes: 41 additions & 19 deletions RuriLib.Parallelization/Exceptions/RequiredStatusException.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,53 @@
using System;
using System.Linq;

namespace RuriLib.Parallelization.Exceptions
namespace RuriLib.Parallelization.Exceptions;

/// <summary>
/// Exception that is thrown when a method of a <see cref="Parallelizer{TInput, TOutput}"/> can only
/// be executed when its status has one of the specified <see cref="ParallelizerStatus"/> values.
/// </summary>
public class RequiredStatusException : Exception
{
/// <summary>
/// Exception that is thrown when a method of a <see cref="Parallelizer{TInput, TOutput}"/> can only
/// be executed when its status has one of the specified <see cref="ParallelizerStatus"/> values.
/// When the <see cref="Parallelizer{TInput, TOutput}"/> requires a status of <paramref name="requiredStatus"/>
/// but the status was <paramref name="actualStatus"/>.
/// </summary>
public class RequiredStatusException : Exception
private RequiredStatusException(ParallelizerStatus actualStatus, ParallelizerStatus requiredStatus)
: base($"The operation can only be performed when the Task Manager is in a {requiredStatus} status, but the status was {actualStatus}.")
{
/// <summary>
/// When the <see cref="Parallelizer{TInput, TOutput}"/> requires a status of <paramref name="requiredStatus"/>
/// but the status was <paramref name="actualStatus"/>.
/// </summary>
public RequiredStatusException(ParallelizerStatus requiredStatus, ParallelizerStatus actualStatus)
: base($"The operation can only be performed when the Task Manager is in a {requiredStatus} status, but the status was {actualStatus}.")
{

}
}

/// <summary>
/// When the <see cref="Parallelizer{TInput, TOutput}"/> requires a status in the <paramref name="requiredStatuses"/> array
/// but the status was <paramref name="actualStatus"/>.
/// </summary>
public RequiredStatusException(ParallelizerStatus[] requiredStatuses, ParallelizerStatus actualStatus)
: base($"The operation can only be performed when the Task Manager is in one of these statuses: {string.Join(", ", requiredStatuses)}, but the status was {actualStatus}.")
{
/// <summary>
/// When the <see cref="Parallelizer{TInput, TOutput}"/> requires a status in the <paramref name="requiredStatuses"/> array
/// but the status was <paramref name="actualStatus"/>.
/// </summary>
private RequiredStatusException(ParallelizerStatus actualStatus, ParallelizerStatus[] requiredStatuses)
: base($"The operation can only be performed when the Task Manager is in one of these statuses: {string.Join(", ", requiredStatuses)}, but the status was {actualStatus}.")
{

}

/// <summary>
/// Throws a <see cref="RequiredStatusException"/> if the <paramref name="actualStatus"/> is not equal to the <paramref name="requiredStatus"/>.
/// </summary>
public static void ThrowIfNot(ParallelizerStatus actualStatus, ParallelizerStatus requiredStatus)
{
if (actualStatus != requiredStatus)
{
throw new RequiredStatusException(actualStatus, requiredStatus);
}
}

/// <summary>
/// Throws a <see cref="RequiredStatusException"/> if the <paramref name="actualStatus"/> is not one of the <paramref name="requiredStatuses"/>.
/// </summary>
public static void ThrowIfNot(ParallelizerStatus actualStatus, ParallelizerStatus[] requiredStatuses)
{
if (!requiredStatuses.Contains(actualStatus))
{
throw new RequiredStatusException(actualStatus, requiredStatuses);
}
}
}
37 changes: 18 additions & 19 deletions RuriLib.Parallelization/Models/ErrorDetails.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
using System;

namespace RuriLib.Parallelization.Models
namespace RuriLib.Parallelization.Models;

/// <summary>
/// Details of an error that happened while processing a work item.
/// </summary>
/// <typeparam name="TInput"></typeparam>
public class ErrorDetails<TInput>
{
/// <summary>The item that was being processed by the operation.</summary>
public TInput Item { get; set; }

/// <summary>The exception thrown by the operation.</summary>
public Exception Exception { get; set; }

/// <summary>
/// Details of an error that happened while processing a work item.
/// Creates error details for a given <paramref name="item"/> for which the work
/// function generated a given <paramref name="exception"/>
/// </summary>
/// <typeparam name="TInput"></typeparam>
public class ErrorDetails<TInput>
public ErrorDetails(TInput item, Exception exception)
{
/// <summary>The item that was being processed by the operation.</summary>
public TInput Item { get; set; }

/// <summary>The exception thrown by the operation.</summary>
public Exception Exception { get; set; }

/// <summary>
/// Creates error details for a given <paramref name="item"/> for which the work
/// function generated a given <paramref name="exception"/>
/// </summary>
public ErrorDetails(TInput item, Exception exception)
{
Item = item;
Exception = exception;
}
Item = item;
Exception = exception;
}
}
39 changes: 19 additions & 20 deletions RuriLib.Parallelization/Models/ResultDetails.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
namespace RuriLib.Parallelization.Models
namespace RuriLib.Parallelization.Models;

/// <summary>
/// Details the result of the execution of the work functions on a given input.
/// </summary>
/// <typeparam name="TInput">The type of input</typeparam>
/// <typeparam name="TOutput">The type of output</typeparam>
public class ResultDetails<TInput, TOutput>
{
/// <summary>The item that was being processed by the operation.</summary>
public TInput Item { get; set; }

/// <summary>The result returned by the operation.</summary>
public TOutput Result { get; set; }

/// <summary>
/// Details the result of the execution of the work functions on a given input.
/// Creates result details for a given <paramref name="item"/> for which
/// the work function generated a given <paramref name="result"/>.
/// </summary>
/// <typeparam name="TInput">The type of input</typeparam>
/// <typeparam name="TOutput">The type of output</typeparam>
public class ResultDetails<TInput, TOutput>
public ResultDetails(TInput item, TOutput result)
{
/// <summary>The item that was being processed by the operation.</summary>
public TInput Item { get; set; }

/// <summary>The result returned by the operation.</summary>
public TOutput Result { get; set; }

/// <summary>
/// Creates result details for a given <paramref name="item"/> for which
/// the work function generated a given <paramref name="result"/>.
/// </summary>
public ResultDetails(TInput item, TOutput result)
{
Item = item;
Result = result;
}
Item = item;
Result = result;
}
}
188 changes: 93 additions & 95 deletions RuriLib.Parallelization/ParallelBasedParallelizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,121 +4,119 @@
using System.Threading;
using System.Threading.Tasks;

namespace RuriLib.Parallelization
namespace RuriLib.Parallelization;

/// <summary>
/// Parallelizer that uses the Parallel.ForEachAsync function.
/// </summary>
public class ParallelBasedParallelizer<TInput, TOutput> : Parallelizer<TInput, TOutput>
{
/// <summary>
/// Parallelizer that uses the Parallel.ForEachAsync function.
/// </summary>
public class ParallelBasedParallelizer<TInput, TOutput> : Parallelizer<TInput, TOutput>
#region Constructors
/// <inheritdoc/>
public ParallelBasedParallelizer(IEnumerable<TInput> workItems, Func<TInput, CancellationToken, Task<TOutput>> workFunction,
int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200)
: base(workItems, workFunction, degreeOfParallelism, totalAmount, skip, maxDegreeOfParallelism)
{
#region Constructors
/// <inheritdoc/>
public ParallelBasedParallelizer(IEnumerable<TInput> workItems, Func<TInput, CancellationToken, Task<TOutput>> workFunction,
int degreeOfParallelism, long totalAmount, int skip = 0, int maxDegreeOfParallelism = 200)
: base(workItems, workFunction, degreeOfParallelism, totalAmount, skip, maxDegreeOfParallelism)
{

}
#endregion

#region Public Methods
/// <inheritdoc/>
public async override Task Start()
{
await base.Start().ConfigureAwait(false);
}
#endregion

stopwatch.Restart();
Status = ParallelizerStatus.Running;
_ = Task.Run(() => Run()).ConfigureAwait(false);
}
#region Public Methods
/// <inheritdoc/>
public override async Task Start()
{
await base.Start().ConfigureAwait(false);

/// <inheritdoc/>
public async override Task Pause()
{
await base.Pause().ConfigureAwait(false);
Stopwatch.Restart();
Status = ParallelizerStatus.Running;
_ = Task.Run(Run).ConfigureAwait(false);
}

throw new NotSupportedException("This parallelizer does not support pausing");
}
/// <inheritdoc/>
public override async Task Pause()
{
await base.Pause().ConfigureAwait(false);

/// <inheritdoc/>
public async override Task Resume()
{
await base.Resume().ConfigureAwait(false);
throw new NotSupportedException("This parallelizer does not support pausing");
}

throw new NotSupportedException("This parallelizer does not support resuming");
}
/// <inheritdoc/>
public override async Task Resume()
{
await base.Resume().ConfigureAwait(false);

/// <inheritdoc/>
public async override Task Stop()
{
await base.Stop().ConfigureAwait(false);
throw new NotSupportedException("This parallelizer does not support resuming");
}

throw new NotSupportedException("This parallelizer does not support soft stopping");
}
/// <inheritdoc/>
public override async Task Stop()
{
await base.Stop().ConfigureAwait(false);

/// <inheritdoc/>
public async override Task Abort()
{
await base.Abort().ConfigureAwait(false);
throw new NotSupportedException("This parallelizer does not support soft stopping");
}

Status = ParallelizerStatus.Stopping;
hardCTS.Cancel();
softCTS.Cancel();
await WaitCompletion().ConfigureAwait(false);
}
/// <inheritdoc/>
public override async Task Abort()
{
await base.Abort().ConfigureAwait(false);

/// <inheritdoc/>
public async override Task ChangeDegreeOfParallelism(int newValue)
{
await base.ChangeDegreeOfParallelism(newValue);
Status = ParallelizerStatus.Stopping;
await HardCts.CancelAsync();
await SoftCts.CancelAsync();
await WaitCompletion().ConfigureAwait(false);
}

if (Status == ParallelizerStatus.Idle)
{
degreeOfParallelism = newValue;
return;
}
/// <inheritdoc/>
public override async Task ChangeDegreeOfParallelism(int newValue)
{
await base.ChangeDegreeOfParallelism(newValue);

if (Status != ParallelizerStatus.Idle)
{
throw new NotSupportedException("You cannot change the DoP while this parallelizer is running");
}
#endregion

DegreeOfParallelism = newValue;
}
#endregion

#region Private Methods
// Run is executed in fire and forget mode (not awaited)
private async void Run()
{
// Skip the items
var items = workItems.Skip(skip);
#region Private Methods
// Run is executed in fire and forget mode (not awaited)
private async void Run()
{
// Skip the items
var items = WorkItems.Skip(Skip);

try
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = degreeOfParallelism,
TaskScheduler = TaskScheduler.Default,
CancellationToken = hardCTS.Token
};
await Parallel.ForEachAsync(items, options, async (item, token) =>
{
await taskFunction(item).ConfigureAwait(false);
});
}
catch (TaskCanceledException)
{
// Operation aborted, don't throw the error
}
catch (Exception ex)
try
{
var options = new ParallelOptions
{
OnError(ex);
}
finally
MaxDegreeOfParallelism = DegreeOfParallelism,
TaskScheduler = TaskScheduler.Default,
CancellationToken = HardCts.Token
};
await Parallel.ForEachAsync(items, options, async (item, _) =>
{
OnCompleted();
Status = ParallelizerStatus.Idle;
hardCTS?.Dispose();
softCTS?.Dispose();
stopwatch?.Stop();
}
await TaskFunction(item).ConfigureAwait(false);
});
}
catch (TaskCanceledException)
{
// Operation aborted, don't throw the error
}
catch (Exception ex)
{
OnError(ex);
}
finally
{
OnCompleted();
Status = ParallelizerStatus.Idle;
HardCts.Dispose();
SoftCts.Dispose();
Stopwatch.Stop();
}
#endregion
}
}
#endregion
}
Loading

0 comments on commit edbd635

Please sign in to comment.