Skip to content

Commit

Permalink
Merge pull request #353 from timpikelmg/feature/209-add-sliding-invis…
Browse files Browse the repository at this point in the history
…ibility-timeouts

#209 - Add Support for Sliding Invisibility Timeouts
  • Loading branch information
azygis committed Jun 27, 2024
2 parents fbb8e2a + b8817ff commit 942fccb
Show file tree
Hide file tree
Showing 9 changed files with 415 additions and 48 deletions.
137 changes: 119 additions & 18 deletions src/Hangfire.PostgreSql/PostgreSqlFetchedJob.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is part of Hangfire.PostgreSql.
// This file is part of Hangfire.PostgreSql.
// Copyright © 2014 Frank Hommers <http://hmm.rs/Hangfire.PostgreSql>.
//
// Hangfire.PostgreSql is free software: you can redistribute it and/or modify
Expand All @@ -20,55 +20,91 @@
// Special thanks goes to him.

using System;
using System.Threading;
using Dapper;
using Hangfire.Logging;
using Hangfire.PostgreSql.Utils;
using Hangfire.Storage;

namespace Hangfire.PostgreSql
{
public class PostgreSqlFetchedJob : IFetchedJob
{
private readonly ILog _logger = LogProvider.GetLogger(typeof(PostgreSqlFetchedJob));

private readonly PostgreSqlStorage _storage;
private bool _disposed;
private bool _removedFromQueue;
private bool _requeued;

private readonly object _syncRoot = new object();
private long _lastHeartbeat;
private readonly TimeSpan _interval;

public PostgreSqlFetchedJob(
PostgreSqlStorage storage,
long id,
string jobId,
string queue)
string queue,
DateTime? fetchedAt)
{
_storage = storage ?? throw new ArgumentNullException(nameof(storage));

Id = id;
JobId = jobId ?? throw new ArgumentNullException(nameof(jobId));
Queue = queue ?? throw new ArgumentNullException(nameof(queue));
FetchedAt = fetchedAt ?? throw new ArgumentNullException(nameof(fetchedAt));

if (storage.Options.UseSlidingInvisibilityTimeout)
{
_lastHeartbeat = TimestampHelper.GetTimestamp();
_interval = TimeSpan.FromSeconds(storage.Options.InvisibilityTimeout.TotalSeconds / 5);
storage.HeartbeatProcess.Track(this);
}
}

public long Id { get; }
public string Queue { get; }
public string JobId { get; }
internal DateTime? FetchedAt { get; private set; }

public void RemoveFromQueue()
{
_storage.UseConnection(null, connection => connection.Execute($@"
DELETE FROM ""{_storage.Options.SchemaName}"".""jobqueue"" WHERE ""id"" = @Id;
",
new { Id }));
lock (_syncRoot)
{
if (!FetchedAt.HasValue)
{
return;
}

_storage.UseConnection(null, connection => connection.Execute($@"
DELETE FROM ""{_storage.Options.SchemaName}"".""jobqueue"" WHERE ""id"" = @Id AND ""fetchedat"" = @FetchedAt;
",
new { Id, FetchedAt }));

_removedFromQueue = true;
_removedFromQueue = true;
}
}

public void Requeue()
{
_storage.UseConnection(null, connection => connection.Execute($@"
UPDATE ""{_storage.Options.SchemaName}"".""jobqueue""
SET ""fetchedat"" = NULL
WHERE ""id"" = @Id;
",
new { Id }));

_requeued = true;
lock (_syncRoot)
{
if (!FetchedAt.HasValue)
{
return;
}

_storage.UseConnection(null, connection => connection.Execute($@"
UPDATE ""{_storage.Options.SchemaName}"".""jobqueue""
SET ""fetchedat"" = NULL
WHERE ""id"" = @Id AND ""fetchedat"" = @FetchedAt;
",
new { Id, FetchedAt }));

FetchedAt = null;
_requeued = true;
}
}

public void Dispose()
Expand All @@ -78,12 +114,77 @@ public void Dispose()
return;
}

if (!_removedFromQueue && !_requeued)
_disposed = true;

DisposeTimer();

lock (_syncRoot)
{
Requeue();
if (!_removedFromQueue && !_requeued)
{
Requeue();
}
}
}

internal void DisposeTimer()
{
if (_storage.Options.UseSlidingInvisibilityTimeout)
{
_storage.HeartbeatProcess.Untrack(this);
}
}

internal void ExecuteKeepAliveQueryIfRequired()
{
var now = TimestampHelper.GetTimestamp();

_disposed = true;
if (TimestampHelper.Elapsed(now, Interlocked.Read(ref _lastHeartbeat)) < _interval)
{
return;
}

lock (_syncRoot)
{
if (!FetchedAt.HasValue)
{
return;
}

if (_requeued || _removedFromQueue)
{
return;
}

string updateFetchAtSql = $@"
UPDATE ""{_storage.Options.SchemaName}"".""jobqueue""
SET ""fetchedat"" = NOW()
WHERE ""id"" = @id AND ""fetchedat"" = @fetchedAt
RETURNING ""fetchedat"" AS ""FetchedAt"";
";

try
{
_storage.UseConnection(null, connection =>
{
FetchedAt = connection.ExecuteScalar<DateTime?>(updateFetchAtSql,
new { queue = Queue, id = Id, fetchedAt = FetchedAt });
});

if (!FetchedAt.HasValue)
{
_logger.Warn(
$"Background job identifier '{JobId}' was fetched by another worker, will not execute keep alive.");
}

_logger.Trace($"Keep-alive query for message {Id} sent");
Interlocked.Exchange(ref _lastHeartbeat, now);
}
catch (Exception ex) when (ex.IsCatchableExceptionType())
{
_logger.DebugException($"Unable to execute keep-alive query for message {Id}", ex);
}
}
}
}
}
61 changes: 61 additions & 0 deletions src/Hangfire.PostgreSql/PostgreSqlHeartbeatProcess.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// This file is part of Hangfire.PostgreSql.
// Copyright © 2014 Frank Hommers <http://hmm.rs/Hangfire.PostgreSql>.
//
// Hangfire.PostgreSql is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3
// of the License, or any later version.
//
// Hangfire.PostgreSql is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with Hangfire.PostgreSql. If not, see <http://www.gnu.org/licenses/>.
//
// This work is based on the work of Sergey Odinokov, author of
// Hangfire. <http://hangfire.io/>
//
// Special thanks goes to him.

using System;
using System.Collections.Concurrent;
using System.Threading;
using Hangfire.Common;
using Hangfire.Server;

namespace Hangfire.PostgreSql
{
#pragma warning disable CS0618
internal sealed class PostgreSqlHeartbeatProcess : IServerComponent, IBackgroundProcess
#pragma warning restore CS0618
{
private readonly ConcurrentDictionary<PostgreSqlFetchedJob, object> _items = new();

public void Track(PostgreSqlFetchedJob item)
{
_items.TryAdd(item, null);
}

public void Untrack(PostgreSqlFetchedJob item)
{
_items.TryRemove(item, out var _);
}

public void Execute(CancellationToken cancellationToken)
{
foreach (var item in _items)
{
item.Key.ExecuteKeepAliveQueryIfRequired();
}

cancellationToken.Wait(TimeSpan.FromSeconds(1));
}

public void Execute(BackgroundProcessContext context)
{
Execute(context.StoppingToken);
}
}
}
7 changes: 4 additions & 3 deletions src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ LIMIT 1
return new PostgreSqlFetchedJob(_storage,
fetchedJob.Id,
fetchedJob.JobId.ToString(CultureInfo.InvariantCulture),
fetchedJob.Queue);
fetchedJob.Queue,
fetchedJob.FetchedAt);
}

[NotNull]
Expand All @@ -214,7 +215,6 @@ internal IFetchedJob Dequeue_UpdateCount(string[] queues, CancellationToken canc
long timeoutSeconds = (long)_storage.Options.InvisibilityTimeout.Negate().TotalSeconds;
FetchedJob markJobAsFetched = null;


string jobToFetchSql = $@"
SELECT ""id"" AS ""Id"", ""jobid"" AS ""JobId"", ""queue"" AS ""Queue"", ""fetchedat"" AS ""FetchedAt"", ""updatecount"" AS ""UpdateCount""
FROM ""{_storage.Options.SchemaName}"".""jobqueue""
Expand Down Expand Up @@ -264,7 +264,8 @@ internal IFetchedJob Dequeue_UpdateCount(string[] queues, CancellationToken canc
return new PostgreSqlFetchedJob(_storage,
markJobAsFetched.Id,
markJobAsFetched.JobId.ToString(CultureInfo.InvariantCulture),
markJobAsFetched.Queue);
markJobAsFetched.Queue,
markJobAsFetched.FetchedAt);
}

private Task ListenForNotificationsAsync(CancellationToken cancellationToken)
Expand Down
14 changes: 13 additions & 1 deletion src/Hangfire.PostgreSql/PostgreSqlStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,18 @@ public PostgreSqlStorage(IConnectionFactory connectionFactory, PostgreSqlStorage
}

InitializeQueueProviders();
if (Options.UseSlidingInvisibilityTimeout)
{
HeartbeatProcess = new PostgreSqlHeartbeatProcess();
}
}

public PersistentJobQueueProviderCollection QueueProviders { get; internal set; }

internal PostgreSqlStorageOptions Options { get; }

internal PostgreSqlHeartbeatProcess HeartbeatProcess { get; }

public override IMonitoringApi GetMonitoringApi()
{
return new PostgreSqlMonitoringApi(this, QueueProviders);
Expand All @@ -124,13 +130,19 @@ public override IEnumerable<IServerComponent> GetComponents()
{
yield return new ExpirationManager(this);
yield return new CountersAggregator(this, Options.CountersAggregateInterval);
if (Options.UseSlidingInvisibilityTimeout)
{
// This is only used to update the sliding invisibility timeouts, so if not enabled then do not use it
yield return HeartbeatProcess;
}
}

public override void WriteOptionsToLog(ILog logger)
{
logger.Info("Using the following options for SQL Server job storage:");
logger.Info("Using the following options for PostgreSQL job storage:");
logger.InfoFormat(" Queue poll interval: {0}.", Options.QueuePollInterval);
logger.InfoFormat(" Invisibility timeout: {0}.", Options.InvisibilityTimeout);
logger.InfoFormat(" Use sliding invisibility timeout: {0}.", Options.UseSlidingInvisibilityTimeout);
}

public override string ToString()
Expand Down
11 changes: 10 additions & 1 deletion src/Hangfire.PostgreSql/PostgreSqlStorageOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public PostgreSqlStorageOptions()
PrepareSchemaIfNecessary = true;
EnableTransactionScopeEnlistment = true;
DeleteExpiredBatchSize = 1000;
UseSlidingInvisibilityTimeout = false;
}

public TimeSpan QueuePollInterval
Expand All @@ -68,7 +69,7 @@ public TimeSpan InvisibilityTimeout
_invisibilityTimeout = value;
}
}

public TimeSpan DistributedLockTimeout
{
get => _distributedLockTimeout;
Expand Down Expand Up @@ -125,6 +126,14 @@ public int DeleteExpiredBatchSize
public bool EnableTransactionScopeEnlistment { get; set; }
public bool EnableLongPolling { get; set; }

/// <summary>
/// Apply a sliding invisibility timeout where the last fetched time is continually updated in the background.
/// This allows a lower invisibility timeout to be used with longer running jobs
/// IMPORTANT: If <see cref="BackgroundJobServerOptions.IsLightweightServer" /> option is used, then sliding invisiblity timeouts will not work
/// since the background storage processes are not run (which is used to update the invisibility timeouts)
/// </summary>
public bool UseSlidingInvisibilityTimeout { get; set; }

private static void ThrowIfValueIsNotPositive(TimeSpan value, string fieldName)
{
string message = $"The {fieldName} property value should be positive. Given: {value}.";
Expand Down
39 changes: 39 additions & 0 deletions src/Hangfire.PostgreSql/Utils/ExceptionTypeHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// This file is part of Hangfire. Copyright © 2022 Hangfire OÜ.
//
// Hangfire is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3
// of the License, or any later version.
//
// Hangfire is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with Hangfire. If not, see <http://www.gnu.org/licenses/>.

// Borrowed from Hangfire

using System;

namespace Hangfire.PostgreSql.Utils
{
internal static class ExceptionTypeHelper
{
#if !NETSTANDARD1_3
private static readonly Type StackOverflowType = typeof(StackOverflowException);
#endif
private static readonly Type OutOfMemoryType = typeof(OutOfMemoryException);

internal static bool IsCatchableExceptionType(this Exception e)
{
var type = e.GetType();
return
#if !NETSTANDARD1_3
type != StackOverflowType &&
#endif
type != OutOfMemoryType;
}
}
}
Loading

0 comments on commit 942fccb

Please sign in to comment.