Skip to content

Commit

Permalink
🧵 Make IActionQueue async (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulcscharf authored Jun 15, 2021
1 parent 25bcaad commit 2291011
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 237 deletions.
129 changes: 12 additions & 117 deletions Bearded.Utilities/Threading/BackgroundActionQueue.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Bearded.Utilities.Threading
{
/// <summary>
/// A threadsafe queue that runs scheduled actions on a separate thread.
/// A thread-safe queue that runs scheduled actions on a separate thread.
/// Typical usage is to schedule actions from one or multiple threads to have them executed in the background.
/// The actions are guaranteed to be executed in the order they are scheduled.
/// </summary>
public sealed class BackgroundActionQueue : IActionQueue
{
#region Fields and Properties
private readonly ManualActionQueue queue = new ManualActionQueue();

private readonly BlockingCollection<Action> actions = new BlockingCollection<Action>();
private readonly Thread thread;

private bool finishing;
Expand All @@ -23,10 +22,6 @@ public sealed class BackgroundActionQueue : IActionQueue
/// </summary>
public bool Finished { get; private set; }

#endregion

#region Constructor

/// <summary>
/// Creates a new background action queue.
/// </summary>
Expand All @@ -50,136 +45,36 @@ public BackgroundActionQueue(string name, bool backgroundThread)
thread.Start();
}

#endregion

#region Methods

private void run()
{
while (!finishing)
{
actions.Take()();
queue.ExecuteOne();
}
Finished = true;
}

/// <summary>
/// Stops the execution of further actions. Actions already queued will still be run.
/// </summary>
public void Finish()
public Task Finish()
{
Finish(true);
return Finish(true);
}

/// <summary>
/// Stops the execution of further actions.
/// </summary>
/// <param name="executeScheduled">If true, finishes executing all actions currently scheduled, otherwise stops after the next action.</param>
public void Finish(bool executeScheduled)
/// <param name="executeScheduled">If true, finishes executing all actions currently scheduled, otherwise stops after the current action.</param>
public Task Finish(bool executeScheduled)
{
if (!executeScheduled)
finishing = true;
actions.Add(() => finishing = true);
}

/// <summary>
/// Stops this queue from running further action, and aborts any actions currently run.
/// Since it kills the underlying thread, this may not be a safe way to dispose of the queue, and may lead to data corruption.
/// Consider using Finish() instead.
/// </summary>
public void Abort()
{
thread.Abort();
Finished = true;
}

#region IActionQueue

/// <summary>
/// Queues an action to run. Returns immediately.
/// </summary>
/// <param name="action">The action to run.</param>
public void RunAndForget(Action action)
{
actions.Add(action);
}

/// <summary>
/// Queues an action to run. Returns only after the action has been executed.
/// </summary>
/// <param name="action">The action to run.</param>
public void RunAndAwait(Action action)
{
var reset = new ManualResetEvent(false);

actions.Add(() =>
{
action();
reset.Set();
});

reset.WaitOne();
}

/// <summary>
/// Queues a parameterless function to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
public T RunAndReturn<T>(Func<T> action)
{
var ret = default(T);
RunAndAwait(() => ret = action());
return ret!;
return queue.Run(() => finishing = true);
}

/// <summary>
/// Queues a function with one parameter to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The argument for calling the function.</param>
public T RunAndReturn<TP0, T>(Func<TP0, T> action, TP0 p0)
{
return RunAndReturn(() => action(p0));
}

/// <summary>
/// Queues a function with two parameters to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The first argument for calling the function.</param>
/// <param name="p1">The second argument for calling the function.</param>
public T RunAndReturn<TP0, TP1, T>(Func<TP0, TP1, T> action, TP0 p0, TP1 p1)
{
return RunAndReturn(() => action(p0, p1));
}

/// <summary>
/// Queues a function with three parameters to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The first argument for calling the function.</param>
/// <param name="p1">The second argument for calling the function.</param>
/// <param name="p2">The third argument for calling the function.</param>
public T RunAndReturn<TP0, TP1, TP2, T>(Func<TP0, TP1, TP2, T> action, TP0 p0, TP1 p1, TP2 p2)
{
return RunAndReturn(() => action(p0, p1, p2));
}

/// <summary>
/// Queues a function with four parameters to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The first argument for calling the function.</param>
/// <param name="p1">The second argument for calling the function.</param>
/// <param name="p2">The third argument for calling the function.</param>
/// <param name="p3">The fourth argument for calling the function.</param>
public T RunAndReturn<TP0, TP1, TP2, TP3, T>(Func<TP0, TP1, TP2, TP3, T> action, TP0 p0, TP1 p1, TP2 p2, TP3 p3)
{
return RunAndReturn(() => action(p0, p1, p2, p3));
}

#endregion

#endregion
public void Queue(Action action) => queue.Queue(action);
public Task Run(Action action) => queue.Run(action);
public Task<T> Run<T>(Func<T> function) => queue.Run(function);
}
}
47 changes: 7 additions & 40 deletions Bearded.Utilities/Threading/IActionQueue.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace Bearded.Utilities.Threading
{
Expand All @@ -11,52 +12,18 @@ public interface IActionQueue
/// Queues an action to run. Returns immediately.
/// </summary>
/// <param name="action">The action to run.</param>
void RunAndForget(Action action);
void Queue(Action action);

/// <summary>
/// Queues an action to run. Returns only after the action has been executed.
/// Queues an action to run. Returns a task that completes when the action has been executed.
/// </summary>
/// <param name="action">The action to run.</param>
void RunAndAwait(Action action);
Task Run(Action action);

/// <summary>
/// Queues a parameterless function to run. Returns the return value of the function only after the function has been executed.
/// Queues a parameterless function to run. Returns a task that completes when the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
T RunAndReturn<T>(Func<T> action);

/// <summary>
/// Queues a function with one parameter to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The argument for calling the function.</param>
T RunAndReturn<TP0, T>(Func<TP0, T> action, TP0 p0);

/// <summary>
/// Queues a function with two parameters to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The first argument for calling the function.</param>
/// <param name="p1">The second argument for calling the function.</param>
T RunAndReturn<TP0, TP1, T>(Func<TP0, TP1, T> action, TP0 p0, TP1 p1);

/// <summary>
/// Queues a function with three parameters to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The first argument for calling the function.</param>
/// <param name="p1">The second argument for calling the function.</param>
/// <param name="p2">The third argument for calling the function.</param>
T RunAndReturn<TP0, TP1, TP2, T>(Func<TP0, TP1, TP2, T> action, TP0 p0, TP1 p1, TP2 p2);

/// <summary>
/// Queues a function with four parameters to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The first argument for calling the function.</param>
/// <param name="p1">The second argument for calling the function.</param>
/// <param name="p2">The third argument for calling the function.</param>
/// <param name="p3">The fourth argument for calling the function.</param>
T RunAndReturn<TP0, TP1, TP2, TP3, T>(Func<TP0, TP1, TP2, TP3, T> action, TP0 p0, TP1 p1, TP2 p2, TP3 p3);
/// <param name="function">The function to run.</param>
Task<T> Run<T>(Func<T> function);
}
}
95 changes: 15 additions & 80 deletions Bearded.Utilities/Threading/ManualActionQueue.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace Bearded.Utilities.Threading
{
/// <summary>
/// A threadsafe queue to run actions from.
/// A thread-safe queue to run actions from.
/// Typical usage is to schedule actions from multiple threads and execute them on one main thread.
/// However, actions can also be executed by multiple threads.
/// If only one thread is used to execute, the actions are guaranteed to be executed in the order they were scheduled.
Expand All @@ -15,10 +15,6 @@ public sealed class ManualActionQueue : IActionQueue
{
private readonly BlockingCollection<Action> actions = new BlockingCollection<Action>();

#region Methods

#region Execute

/// <summary>
/// Executes one scheduled action.
/// If no action is scheduled, this will wait until one is scheduled, and then execute that.
Expand Down Expand Up @@ -85,96 +81,35 @@ public int ExecuteFor(TimeSpan time)
return executed;
}

#endregion

#region IActionQueue

/// <summary>
/// Queues an action to run. Returns immediately.
/// </summary>
/// <param name="action">The action to run.</param>
public void RunAndForget(Action action)
public void Queue(Action action)
{
actions.Add(action);
}

/// <summary>
/// Queues an action to run. Returns only after the action has been executed.
/// </summary>
/// <param name="action">The action to run.</param>
public void RunAndAwait(Action action)
public Task Run(Action action)
{
var reset = new ManualResetEvent(false);
var task = new TaskCompletionSource<object?>();

actions.Add(() =>
{
action();
reset.Set();
task.SetResult(null);
});

reset.WaitOne();
}

/// <summary>
/// Queues a parameterless function to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
public T RunAndReturn<T>(Func<T> action)
{
T ret = default;
RunAndAwait(() => ret = action());
return ret!;
}

/// <summary>
/// Queues a function with one parameter to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The argument for calling the function.</param>
public T RunAndReturn<TP0, T>(Func<TP0, T> action, TP0 p0)
{
return RunAndReturn(() => action(p0));
return task.Task;
}

/// <summary>
/// Queues a function with two parameters to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The first argument for calling the function.</param>
/// <param name="p1">The second argument for calling the function.</param>
public T RunAndReturn<TP0, TP1, T>(Func<TP0, TP1, T> action, TP0 p0, TP1 p1)
public Task<T> Run<T>(Func<T> function)
{
return RunAndReturn(() => action(p0, p1));
}
var task = new TaskCompletionSource<T>();

/// <summary>
/// Queues a function with three parameters to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The first argument for calling the function.</param>
/// <param name="p1">The second argument for calling the function.</param>
/// <param name="p2">The third argument for calling the function.</param>
public T RunAndReturn<TP0, TP1, TP2, T>(Func<TP0, TP1, TP2, T> action, TP0 p0, TP1 p1, TP2 p2)
{
return RunAndReturn(() => action(p0, p1, p2));
}
actions.Add(() =>
{
var result = function();
task.SetResult(result);
});

/// <summary>
/// Queues a function with four parameters to run. Returns the return value of the function only after the function has been executed.
/// </summary>
/// <param name="action">The function to run.</param>
/// <param name="p0">The first argument for calling the function.</param>
/// <param name="p1">The second argument for calling the function.</param>
/// <param name="p2">The third argument for calling the function.</param>
/// <param name="p3">The fourth argument for calling the function.</param>
public T RunAndReturn<TP0, TP1, TP2, TP3, T>(Func<TP0, TP1, TP2, TP3, T> action, TP0 p0, TP1 p1, TP2 p2, TP3 p3)
{
return RunAndReturn(() => action(p0, p1, p2, p3));
return task.Task;
}

#endregion

#endregion

}
}

0 comments on commit 2291011

Please sign in to comment.