Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switched to AsyncKeyedLock #877

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 18 additions & 32 deletions RuriLib/Helpers/AsyncLocker.cs
Original file line number Diff line number Diff line change
@@ -1,46 +1,32 @@
using System;
using System.Collections.Generic;
using AsyncKeyedLock;
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace RuriLib.Helpers
{
public class AsyncLocker : IDisposable
public static class AsyncLocker
{
private readonly Dictionary<string, SemaphoreSlim> semaphores = new();

public Task Acquire(string key, CancellationToken cancellationToken = default)
private static readonly AsyncKeyedLocker<string> semaphores = new(o =>
{
if (!semaphores.ContainsKey(key))
{
semaphores[key] = new SemaphoreSlim(1, 1);
}

return semaphores[key].WaitAsync(cancellationToken);
}
o.PoolSize = 20;
o.PoolInitialFill = 1;
});

public Task Acquire(Type classType, string methodName, CancellationToken cancellationToken = default)
=> Acquire(CombineTypes(classType, methodName), cancellationToken);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ValueTask<IDisposable> LockAsync(string key, CancellationToken cancellationToken) => semaphores.LockAsync(key, cancellationToken);

public void Release(string key) => semaphores[key].Release();
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ValueTask<IDisposable> LockAsync(string key) => semaphores.LockAsync(key);

public void Release(Type classType, string methodName) => Release(CombineTypes(classType, methodName));
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ValueTask<IDisposable> LockAsync(Type classType, string methodName, CancellationToken cancellationToken) => semaphores.LockAsync(CombineTypes(classType, methodName), cancellationToken);

private string CombineTypes(Type classType, string methodName) => $"{classType.FullName}.{methodName}";

public void Dispose()
{
foreach (var semaphore in semaphores.Values)
{
try
{
semaphore.Dispose();
}
catch
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ValueTask<IDisposable> LockAsync(Type classType, string methodName) => semaphores.LockAsync(CombineTypes(classType, methodName));

}
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static string CombineTypes(Type classType, string methodName) => $"{classType.FullName}.{methodName}";
}
}
1 change: 0 additions & 1 deletion RuriLib/Models/Bots/BotData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class BotData
public IBotLogger Logger { get; set; }
public Random Random { get; }
public CancellationToken CancellationToken { get; set; }
public AsyncLocker AsyncLocker { get; set; }
public Stepper Stepper { get; set; }
public decimal CaptchaCredit { get; set; } = 0;
public string ExecutionInfo { get; set; } = "IDLE";
Expand Down
3 changes: 0 additions & 3 deletions RuriLib/Models/Debugger/ConfigDebugger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public async Task Run()
var pco = (PythonCompilerOptions)pyengine.GetCompilerOptions();
pco.Module &= ~ModuleOptions.Optimized;
data.SetObject("ironPyEngine", pyengine);
data.AsyncLocker = new();

dynamic globals = new ExpandoObject();

Expand Down Expand Up @@ -324,8 +323,6 @@ public async Task Run()
{
resource.Dispose();
}

data.AsyncLocker.Dispose();
}

Status = ConfigDebuggerStatus.Idle;
Expand Down
72 changes: 14 additions & 58 deletions RuriLib/Models/Jobs/MultiRunJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class MultiRunJob : Job
private Dictionary<string, string> legacyGlobalCookies;
private Dictionary<string, ConfigResource> resources;
private HttpClient httpClient;
private AsyncLocker asyncLocker;
private Timer proxyReloadTimer;

// Instance properties and stats
Expand Down Expand Up @@ -136,7 +135,7 @@ public class MultiRunJob : Job
public MultiRunJob(RuriLibSettingsService settings, PluginRepository pluginRepo, IJobLogger logger = null)
: base(settings, pluginRepo, logger)
{

}

#region Work Function
Expand Down Expand Up @@ -227,28 +226,22 @@ public MultiRunJob(RuriLibSettingsService settings, PluginRepository pluginRepo,
{
if (input.Job.NoValidProxyBehaviour == NoValidProxyBehaviour.Reload)
{
try
using (await AsyncLocker.LockAsync(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync),
input.BotData.CancellationToken).ConfigureAwait(false))
{
await input.BotData.AsyncLocker.Acquire(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync),
input.BotData.CancellationToken).ConfigureAwait(false);

botData.Proxy = input.ProxyPool.GetProxy(input.Job.ConcurrentProxyMode, input.BotData.ConfigSettings.ProxySettings.MaxUsesPerProxy);

if (botData.Proxy == null)
{
await input.ProxyPool.ReloadAllAsync(true, token).ConfigureAwait(false);
}
}
finally
{
input.BotData.AsyncLocker.Release(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync));
}
}
else if (input.Job.NoValidProxyBehaviour == NoValidProxyBehaviour.Unban)
{
input.ProxyPool.UnbanAll(input.Job.ProxyBanTime);
}

goto GETPROXY;
}
}
Expand All @@ -274,7 +267,7 @@ await input.BotData.AsyncLocker.Acquire(typeof(ProxyPool), nameof(ProxyPool.Relo
});

await task.ConfigureAwait(false);

}
// If it's a legacy config, run the LoliScript engine
else if (input.IsLegacy)
Expand Down Expand Up @@ -437,7 +430,7 @@ await input.BotData.AsyncLocker.Acquire(typeof(ProxyPool), nameof(ProxyPool.Relo
}

// RETURN THE RESULT
return new CheckResult
return new CheckResult
{
BotData = botData,
OutputVariables = outputVariables
Expand All @@ -458,8 +451,6 @@ public override async Task Start(CancellationToken cancellationToken = default)
Status = JobStatus.Starting;
OnStatusChanged?.Invoke(this, Status);

asyncLocker = new();

if (Config == null)
throw new NullReferenceException("The Config cannot be null");

Expand All @@ -482,15 +473,10 @@ public override async Task Start(CancellationToken cancellationToken = default)

var proxyPoolOptions = new ProxyPoolOptions { AllowedTypes = Config.Settings.ProxySettings.AllowedProxyTypes };
proxyPool = new ProxyPool(ProxySources, proxyPoolOptions);
try
using (await AsyncLocker.LockAsync(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync), cancellationToken).ConfigureAwait(false))
{
await asyncLocker.Acquire(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync), cancellationToken).ConfigureAwait(false);
await proxyPool.ReloadAllAsync(ShuffleProxies, cancellationToken).ConfigureAwait(false);
}
finally
{
asyncLocker.Release(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync));
}

if (!proxyPool.Proxies.Any())
{
Expand Down Expand Up @@ -621,7 +607,6 @@ public override async Task Start(CancellationToken cancellationToken = default)
input.BotData.Logger.Enabled = settings.RuriLibSettings.GeneralSettings.EnableBotLogging && Config.Mode != ConfigMode.DLL;
input.BotData.SetObject("httpClient", httpClient); // Add the default HTTP client
input.BotData.SetObject("ironPyEngine", pyengine); // Add the IronPython engine
input.BotData.AsyncLocker = asyncLocker;

return input;
});
Expand Down Expand Up @@ -725,16 +710,10 @@ public override async Task Resume()
#region Public Methods
public async Task FetchProxiesFromSources(CancellationToken cancellationToken = default)
{
try
using (await AsyncLocker.LockAsync(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync), cancellationToken).ConfigureAwait(false))
{
await asyncLocker.Acquire(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync), cancellationToken).ConfigureAwait(false);
await proxyPool.ReloadAllAsync(ShuffleProxies).ConfigureAwait(false);
}
finally
{
asyncLocker.Release(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync));
}

}

#endregion
Expand Down Expand Up @@ -769,7 +748,7 @@ private void PropagateResult(object _, ResultDetails<MultiRunInput, CheckResult>
OnResult?.Invoke(this, result);

if (!settings.RuriLibSettings.GeneralSettings.LogAllResults) return;

var data = result.Result.BotData;
logger?.LogInfo(Id, $"[{data.STATUS}] {data.Line.Data} ({data.Proxy})");
}
Expand Down Expand Up @@ -800,20 +779,11 @@ private void StartTimers()
// Unhandled exceptions will crash the process
if (proxyPool is not null)
{
try
using (await AsyncLocker.LockAsync(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync), CancellationToken.None)
.ConfigureAwait(false))
{
await asyncLocker.Acquire(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync), CancellationToken.None)
.ConfigureAwait(false);
await proxyPool.ReloadAllAsync(ShuffleProxies).ConfigureAwait(false);
}
catch
{
// ignored
}
finally
{
asyncLocker.Release(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync));
}
}
}), null, (int)PeriodicReloadInterval.TotalMilliseconds, (int)PeriodicReloadInterval.TotalMilliseconds);
}
Expand Down Expand Up @@ -890,8 +860,8 @@ private async Task RegisterHit(CheckResult result)
var hit = new Hit()
{
Data = botData.Line,
BotLogger = settings.RuriLibSettings.GeneralSettings.EnableBotLogging && Config.Mode != ConfigMode.DLL
? botData.Logger
BotLogger = settings.RuriLibSettings.GeneralSettings.EnableBotLogging && Config.Mode != ConfigMode.DLL
? botData.Logger
: null,
Type = botData.STATUS,
DataPool = DataPool,
Expand Down Expand Up @@ -969,20 +939,6 @@ private void DisposeGlobals()
}
}

if (asyncLocker is not null)
{
try
{
asyncLocker.Dispose();
}
catch
{

}
}

proxyPool?.Dispose();

if (ProxySources is not null)
{
for (int i = 0; i < ProxySources.Count; i++)
Expand Down
25 changes: 3 additions & 22 deletions RuriLib/Models/Proxies/ProxyPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class ProxyPool : IDisposable

private readonly int minBackoff = 5000;
private readonly int maxReloadTries = 10;
private AsyncLocker asyncLocker;

/// <summary>
/// Initializes the proxy pool given the proxy sources.
Expand All @@ -34,7 +33,6 @@ public ProxyPool(IEnumerable<ProxySource> sources, ProxyPoolOptions options = nu
{
this.sources = sources.ToList();
this.options = options ?? new ProxyPoolOptions();
this.asyncLocker = new();
}

/// <summary>
Expand Down Expand Up @@ -157,13 +155,12 @@ public async Task ReloadAllAsync(bool shuffle = true, CancellationToken cancella
return;
}

try
isReloadingProxies = true;
using (await AsyncLocker.LockAsync(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync), cancellationToken).ConfigureAwait(false))
{
isReloadingProxies = true;
await asyncLocker.Acquire(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync), cancellationToken).ConfigureAwait(false);
var currentTry = 0;
var currentBackoff = minBackoff;

// For a maximum of 'maxReloadTries' times
while (currentTry < maxReloadTries)
{
Expand All @@ -180,11 +177,7 @@ public async Task ReloadAllAsync(bool shuffle = true, CancellationToken cancella
currentTry++;
currentBackoff *= 2;
}
}
finally
{
isReloadingProxies = false;
asyncLocker.Release(typeof(ProxyPool), nameof(ProxyPool.ReloadAllAsync));
}
}

Expand Down Expand Up @@ -239,18 +232,6 @@ private async Task<bool> TryReloadAllAsync(bool shuffle = true, CancellationToke

public void Dispose()
{
if (asyncLocker is not null)
{
try
{
asyncLocker.Dispose();
asyncLocker = null;
}
catch
{
// ignored
}
}
}
}
}
24 changes: 7 additions & 17 deletions RuriLib/Models/Proxies/ProxySource/FileProxySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ namespace RuriLib.Models.Proxies.ProxySources
public class FileProxySource : ProxySource
{
public string FileName { get; set; }
private AsyncLocker asyncLocker;

public FileProxySource(string fileName)
{
FileName = fileName;
asyncLocker = new();
}

public async override Task<IEnumerable<Proxy>> GetAllAsync(CancellationToken cancellationToken = default)
Expand All @@ -29,14 +27,15 @@ public async override Task<IEnumerable<Proxy>> GetAllAsync(CancellationToken can
// The file is a script.
// We will run the execute and read it's stdout for proxies.
// just like raw proxy files, one proxy per line
await asyncLocker.Acquire("ProxySourceReloadScriptFile", CancellationToken.None).ConfigureAwait(false);
var stdout = await RunScript.RunScriptAndGetStdOut(FileName).ConfigureAwait(false);
if (stdout is null)
using (await AsyncLocker.LockAsync("ProxySourceReloadScriptFile").ConfigureAwait(false))
{
throw new Exception($"Failed to get stdout of {FileName}");
var stdout = await RunScript.RunScriptAndGetStdOut(FileName).ConfigureAwait(false);
if (stdout is null)
{
throw new Exception($"Failed to get stdout of {FileName}");
}
lines = stdout.Split('\n', StringSplitOptions.RemoveEmptyEntries);
}
lines = stdout.Split('\n', StringSplitOptions.RemoveEmptyEntries);
asyncLocker.Release("ProxySourceReloadScriptFile");
}
else
{
Expand All @@ -50,15 +49,6 @@ public async override Task<IEnumerable<Proxy>> GetAllAsync(CancellationToken can

public override void Dispose()
{
try
{
asyncLocker.Dispose();
asyncLocker = null;
}
catch
{
// ignored
}
}
}
}
Loading