Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions SWLOR.BackgroundServices/BackgroundJobs/BackgroundJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using StackExchange.Redis;

namespace SWLOR.BackgroundServices.BackgroundJobs
{
public sealed class BackgroundJob
{
public RedisValue Id { get; }
public string Type { get; }
public string Payload { get; }
public int Attempt { get; }
public StreamEntry Entry { get; }

private BackgroundJob(RedisValue id, string type, string payload, int attempt, StreamEntry entry)
{
Id = id;
Type = type;
Payload = payload;
Attempt = attempt;
Entry = entry;
}

public static bool TryCreate(StreamEntry entry, out BackgroundJob? job, out string error)
{
var values = entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
var type = values.GetValueOrDefault("type");
var payload = values.GetValueOrDefault("payload");

if (string.IsNullOrWhiteSpace(type) || string.IsNullOrWhiteSpace(payload))
{
job = null;
error = "Job is missing type or payload.";
return false;
}

job = new BackgroundJob(entry.Id, type, payload, ParseAttempt(values.GetValueOrDefault("attempt")), entry);
error = string.Empty;
return true;
}

private static int ParseAttempt(string? value)
{
return int.TryParse(value, out var attempt)
? Math.Max(0, attempt)
: 0;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}
}
109 changes: 109 additions & 0 deletions SWLOR.BackgroundServices/BackgroundJobs/BackgroundJobFailureHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using StackExchange.Redis;
using SWLOR.BackgroundServices.Configuration;
using SWLOR.BackgroundServices.Infrastructure;

namespace SWLOR.BackgroundServices.BackgroundJobs
{
public sealed class BackgroundJobFailureHandler
{
private const string RequeueAndAcknowledgeScript = @"
redis.call('XADD', KEYS[1], 'MAXLEN', '~', ARGV[1], '*',
'type', ARGV[4],
'payload', ARGV[5],
'attempt', ARGV[6],
'createdUtc', ARGV[7],
'lastError', ARGV[8])
return redis.call('XACK', KEYS[1], ARGV[2], ARGV[3])
";

private const string MoveToDeadLetterAndAcknowledgeScript = @"
redis.call('XADD', KEYS[1], '*',
'originalId', ARGV[2],
'error', ARGV[3],
'failedUtc', ARGV[4],
unpack(ARGV, 5))
return redis.call('XACK', KEYS[2], ARGV[1], ARGV[2])
";

private readonly BackgroundServiceSettings _settings;
private readonly IAppLogger _logger;

public BackgroundJobFailureHandler(BackgroundServiceSettings settings, IAppLogger logger)
{
_settings = settings;
_logger = logger;
}

public async Task HandleFailureAsync(IDatabase database, BackgroundJob job, Exception exception)
{
var nextAttempt = job.Attempt + 1;
if (nextAttempt >= _settings.MaxAttempts)
{
await MoveToDeadLetterAsync(database, job.Entry, exception.ToString());
_logger.Error($"Background job {job.Id} failed permanently after {nextAttempt} attempts: {exception.Message}");
return;
}

await RequeueAndAcknowledgeAsync(database, job, nextAttempt, exception.ToString());
_logger.Error($"Background job {job.Id} failed attempt {nextAttempt}; requeued. {exception.Message}");
}

public async Task MoveToDeadLetterAsync(IDatabase database, StreamEntry entry, string error)
{
var arguments = new List<RedisValue>
{
BackgroundJobQueueNames.ConsumerGroup,
entry.Id,
Truncate(error),
DateTime.UtcNow.ToString("O")
};

foreach (var value in entry.Values)
{
arguments.Add(value.Name);
arguments.Add(value.Value);
}

await database.ScriptEvaluateAsync(
MoveToDeadLetterAndAcknowledgeScript,
new RedisKey[]
{
BackgroundJobQueueNames.DeadLetterStreamName,
BackgroundJobQueueNames.StreamName
},
arguments.ToArray());
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

private async Task RequeueAndAcknowledgeAsync(
IDatabase database,
BackgroundJob job,
int nextAttempt,
string error)
{
await database.ScriptEvaluateAsync(
RequeueAndAcknowledgeScript,
new RedisKey[]
{
BackgroundJobQueueNames.StreamName
},
new RedisValue[]
{
BackgroundJobQueueNames.MaxStreamLength,
BackgroundJobQueueNames.ConsumerGroup,
job.Id,
job.Type,
job.Payload,
nextAttempt.ToString(),
DateTime.UtcNow.ToString("O"),
Truncate(error)
});
}

private string Truncate(string value)
{
return value.Length <= _settings.MaxLogContentLength
? value
: value.Substring(0, _settings.MaxLogContentLength) + "...";
}
}
}
64 changes: 64 additions & 0 deletions SWLOR.BackgroundServices/BackgroundJobs/BackgroundJobProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using StackExchange.Redis;
using SWLOR.BackgroundServices.Infrastructure;

namespace SWLOR.BackgroundServices.BackgroundJobs
{
public sealed class BackgroundJobProcessor
{
private readonly IReadOnlyDictionary<string, IBackgroundJobHandler> _handlers;
private readonly BackgroundJobFailureHandler _failureHandler;
private readonly IAppLogger _logger;

public BackgroundJobProcessor(
IReadOnlyDictionary<string, IBackgroundJobHandler> handlers,
BackgroundJobFailureHandler failureHandler,
IAppLogger logger)
{
_handlers = handlers;
_failureHandler = failureHandler;
_logger = logger;
}

public async Task ProcessAsync(IDatabase database, StreamEntry entry, CancellationToken cancellationToken)
{
if (!BackgroundJob.TryCreate(entry, out var job, out var error))
{
await _failureHandler.MoveToDeadLetterAsync(database, entry, error);
return;
}

var backgroundJob = job!;
if (!_handlers.TryGetValue(backgroundJob.Type, out var handler))
{
await _failureHandler.MoveToDeadLetterAsync(
database,
backgroundJob.Entry,
$"Unsupported background job type '{backgroundJob.Type}'.");
_logger.Error($"Unsupported background job type '{backgroundJob.Type}' for job {backgroundJob.Id}; moved to dead-letter.");
return;
}

try
{
await handler.HandleAsync(backgroundJob.Payload, cancellationToken);
// Redis Streams provide at-least-once delivery here: if the handler succeeds
// but XACK fails, this job can be delivered again. Handlers must be idempotent
// or use their own deduplication/idempotency keys.
await database.StreamAcknowledgeAsync(
BackgroundJobQueueNames.StreamName,
BackgroundJobQueueNames.ConsumerGroup,
backgroundJob.Id);

Comment thread
coderabbitai[bot] marked this conversation as resolved.
_logger.Info($"Processed background job {backgroundJob.Id} ({backgroundJob.Type}).");
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
await _failureHandler.HandleFailureAsync(database, backgroundJob, ex);
}
}
}
}
10 changes: 10 additions & 0 deletions SWLOR.BackgroundServices/BackgroundJobs/BackgroundJobQueueNames.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace SWLOR.BackgroundServices.BackgroundJobs
{
public static class BackgroundJobQueueNames
{
public const string StreamName = "swlor:background-jobs";
public const string DeadLetterStreamName = "swlor:background-jobs:dead";
public const string ConsumerGroup = "swlor-background-services";
public const int MaxStreamLength = 10000;
}
}
8 changes: 8 additions & 0 deletions SWLOR.BackgroundServices/BackgroundJobs/BackgroundJobTypes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace SWLOR.BackgroundServices.BackgroundJobs
{
public static class BackgroundJobTypes
{
public const string GitHubIssue = "GitHubIssue";
public const string DiscordWebhook = "DiscordWebhook";
}
}
Loading