Skip to content

Commit

Permalink
Merge pull request #369 from shokanson-extensiv/features/#368-allow-s…
Browse files Browse the repository at this point in the history
…cheduler-parallelism

#368 allow scheduler parallelism
  • Loading branch information
azygis authored Jun 27, 2024
2 parents 28da820 + e724c68 commit fbb8e2a
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 3 deletions.
29 changes: 29 additions & 0 deletions src/Hangfire.PostgreSql/PostgreSqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,35 @@ public override string GetFirstByLowestScoreFromSet(string key, double fromScore
new { Key = key, FromScore = fromScore, ToScore = toScore }));
}

public override List<string> GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore, int count)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}

if (toScore < fromScore)
{
throw new ArgumentException($"The '{nameof(toScore)}' value must be higher or equal to the '{nameof(fromScore)}' value.");
}

if (count < 1)
{
throw new ArgumentException($"The '{nameof(count)}' value must be greater than zero (0).");
}

return _storage.UseConnection(_dedicatedConnection, connection => connection
.Query<string>($@"
SELECT ""value""
FROM ""{_options.SchemaName}"".""set""
WHERE ""key"" = @Key
AND ""score"" BETWEEN @FromScore AND @ToScore
ORDER BY ""score"" LIMIT @Limit;
",
new { Key = key, FromScore = fromScore, ToScore = toScore, Limit = count }))
.ToList();
}

public override void SetRangeInHash(string key, IEnumerable<KeyValuePair<string, string>> keyValuePairs)
{
if (key == null)
Expand Down
1 change: 1 addition & 0 deletions src/Hangfire.PostgreSql/PostgreSqlStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class PostgreSqlStorage : JobStorage
new(StringComparer.OrdinalIgnoreCase)
{
{ JobStorageFeatures.JobQueueProperty, true },
{ JobStorageFeatures.Connection.BatchedGetFirstByLowest, true }
};

[Obsolete("Will be removed in 2.0, please use the overload with IConnectionFactory argument")]
Expand Down
120 changes: 120 additions & 0 deletions tests/Hangfire.PostgreSql.Tests/PostgreSqlConnectionFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,126 @@ public void GetFirstByLowestScoreFromSet_ReturnsTheValueWithTheLowestScore()
});
}

[Fact]
[CleanDatabase]
public void GetFirstByLowestScoreFromSet_List_ThrowsAnException_WhenKeyIsNull()
{
UseConnection(connection => {
ArgumentNullException exception = Assert.Throws<ArgumentNullException>(() => connection.GetFirstByLowestScoreFromSet(null, 0, 1, 1));
Assert.Equal("key", exception.ParamName);
});
}

[Fact]
[CleanDatabase]
public void GetFirstByLowestScoreFromSet_List_ThrowsAnException_WhenToScoreIsLowerThanFromScore()
{
UseConnection(connection => {
ArgumentException exception = Assert.Throws<ArgumentException>(() => connection.GetFirstByLowestScoreFromSet("key", 0, -1, 1));
Assert.Contains("The 'toScore' value must be higher or equal to the 'fromScore' value.", exception.Message);
});
}

[Theory]
[CleanDatabase]
[InlineData(-1)]
[InlineData(0)]
public void GetFirstByLowestScoreFromSet_List_ThrowsAnException_WhenCountIsLessThanOne(int count)
{
UseConnection(connection => {
ArgumentException exception = Assert.Throws<ArgumentException>(() => connection.GetFirstByLowestScoreFromSet("key", 0, 1, count));
Assert.Contains("The 'count' value must be greater than zero (0).", exception.Message);
});
}

[Fact]
[CleanDatabase]
public void GetFirstByLowestScoreFromSet_List_ReturnsEmpty_WhenTheKeyDoesNotExist()
{
UseConnection(connection => {
List<string> result = connection.GetFirstByLowestScoreFromSet("key", 0, 1, 1);
Assert.NotNull(result);
Assert.Empty(result);
});
}

[Fact]
[CleanDatabase]
public void GetFirstByLowestScoreFromSet_List_ReturnsEmpty_WhenNoValuesExistForKey()
{
string arrangeSql = $@"
INSERT INTO ""{GetSchemaName()}"".""set"" (""key"", ""score"", ""value"")
VALUES
('another-key', -2.0, '-2.0')
";

UseConnections((connection, jobStorageConnection) => {
connection.Execute(arrangeSql);
List<string> result = jobStorageConnection.GetFirstByLowestScoreFromSet("key", 0, 1, 1);
Assert.NotNull(result);
Assert.Empty(result);
});
}

[Fact]
[CleanDatabase]
public void GetFirstByLowestScoreFromSet_List_ReturnsAllLowestValuesMatchingInputs()
{
string arrangeSql = $@"
INSERT INTO ""{GetSchemaName()}"".""set"" (""key"", ""score"", ""value"")
VALUES
('key', 1.0, '1.0'),
('key', -1.0, '-1.0'),
('key', -5.0, '-5.0'),
('another-key', -2.0, '-2.0')
";

UseConnections((connection, jobStorageConnection) => {
connection.Execute(arrangeSql);
List<string> result = jobStorageConnection.GetFirstByLowestScoreFromSet("key", -1.0, 3.0, 10);
Assert.Equal(2, result.Count);
Assert.Equal("-1.0", result[0]);
Assert.Equal("1.0", result[1]);
});
}

[Fact]
[CleanDatabase]
public void GetFirstByLowestScoreFromSet_List_ReturnsSubsetOfLowestValuesMatchingInputs()
{
string arrangeSql = $@"
INSERT INTO ""{GetSchemaName()}"".""set"" (""key"", ""score"", ""value"")
VALUES
('key', 1.0, '1.0'),
('key', 1.5, '1.5'),
('key', 2.0, '2.0'),
('key', 2.5, '2.5'),
('key', -1.0, '-1.0'),
('key', -5.0, '-5.0'),
('another-key', -2.0, '-2.0')
";

int count = 3;
UseConnections((connection, jobStorageConnection) => {
connection.Execute(arrangeSql);
List<string> result = jobStorageConnection.GetFirstByLowestScoreFromSet("key", -1.0, 3.0, count);
Assert.Equal(count, result.Count);
Assert.Equal("-1.0", result[0]);
Assert.Equal("1.0", result[1]);
Assert.Equal("1.5", result[2]);
});
}

[Fact]
[CleanDatabase]
public void AnnounceServer_ThrowsAnException_WhenServerIdIsNull()
Expand Down
25 changes: 22 additions & 3 deletions tests/Hangfire.PostgreSql.Tests/PostgreSqlStorageFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,34 @@ public void CanUseTransaction_WithDifferentTransactionIsolationLevel()
{
using TransactionScope scope = new(TransactionScopeOption.Required,
new TransactionOptions() { IsolationLevel = IsolationLevel.Serializable });

PostgreSqlStorage storage = new(new DefaultConnectionFactory(), _options);
NpgsqlConnection connection = storage.CreateAndOpenConnection();

bool success = storage.UseTransaction(connection, (_, _) => true);

Assert.True(success);
}

[Fact]
public void HasFeature_ThrowsAnException_WhenFeatureIsNull()
{
ArgumentNullException aex = Assert.Throws<ArgumentNullException>(() => new PostgreSqlStorage(new DefaultConnectionFactory(), _options).HasFeature(null));
Assert.Equal("featureId", aex.ParamName);
}

[Theory]
[InlineData("Job.Queue", true)] // JobStorageFeatures.JobQueueProperty
[InlineData("Connection.BatchedGetFirstByLowestScoreFromSet", true)] // JobStorageFeatures.Connection.BatchedGetFirstByLowest
[InlineData("", false)]
[InlineData("Unsupported", false)]
public void HasFeature_ReturnsCorrectValues(string featureName, bool expected)
{
PostgreSqlStorage storage = new(new DefaultConnectionFactory(), _options);
bool actual = storage.HasFeature(featureName);
Assert.Equal(expected, actual);
}

private PostgreSqlStorage CreateStorage()
{
return new PostgreSqlStorage(ConnectionUtils.GetDefaultConnectionFactory(), _options);
Expand Down

0 comments on commit fbb8e2a

Please sign in to comment.