Skip to content

Commit

Permalink
Add PolicyDefinition.ExtensionData, Queue.ExtensionData, IMC.GetQueue…
Browse files Browse the repository at this point in the history
…sWithoutStatsAsync (#285)

* Create QueueWithoutStats.cs

* Create QueueType.cs

* Update Queue.cs

* Update PolicyDefinition.cs

* Update IManagementClient.cs

* Update IManagementClient.cs

* Update Connection.cs

* Update IManagementClient.cs

* Update ManagementClient.cs

* Fix

* fix

* Fix formatting

* approved.txt

* string? Node

* string? Node

* .

* fix
  • Loading branch information
inikulshin authored Dec 21, 2023
1 parent 7ca648c commit 490e21e
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 67 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public async Task Should_be_able_to_create_all_the_definitions_in_a_policy()
const long maxLengthBytes = 5000;
const Overflow overflow = Overflow.RejectPublish;
uint? consumerTimeout = fixture.RabbitmqVersion >= new Version("3.12") ? 3600000 : null;
Dictionary<string, object> extensionData = new Dictionary<string, object> { { "max-in-memory-length", 1000000 } };

await fixture.ManagementClient.CreatePolicyAsync(
new Policy(
Expand Down Expand Up @@ -265,7 +266,8 @@ await fixture.ManagementClient.CreatePolicyAsync(
MaxLengthBytes: maxLengthBytes,
Overflow: overflow,
ConsumerTimeout: consumerTimeout
),
)
{ ExtensionData = extensionData },
Priority: priority
)
);
Expand All @@ -292,7 +294,8 @@ await fixture.ManagementClient.CreatePolicyAsync(
&& p.Definition.MaxLength == maxLength
&& p.Definition.MaxLengthBytes == maxLengthBytes
&& p.Definition.Overflow == overflow
&& p.Definition.ConsumerTimeout == consumerTimeout)
&& p.Definition.ConsumerTimeout == consumerTimeout
&& p.Definition.ExtensionData.Keys.Order().SequenceEqual(extensionData.Keys.Order()))
);
}

Expand Down Expand Up @@ -1207,7 +1210,24 @@ public async Task Should_get_permissions()
public async Task Should_get_queues()
{
await CreateTestQueue(TestQueue);
(await fixture.ManagementClient.GetQueuesAsync()).Count.Should().BeGreaterThan(0);
while (true)
{
var queues = await fixture.ManagementClient.GetQueuesAsync();
queues.Should().NotBeNullOrEmpty();
if (queues[0].State != null)
{
queues[0].ExtensionData.Should().NotBeNullOrEmpty();
break;
}
}
}

[Fact]
public async Task Should_get_queues_without_stats()
{
await CreateTestQueue(TestQueue);
var queues = await fixture.ManagementClient.GetQueuesWithoutStatsAsync();
queues.Should().NotBeNullOrEmpty();
}


Expand Down
9 changes: 8 additions & 1 deletion Source/EasyNetQ.Management.Client/IManagementClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Task<Channel> GetChannelAsync(
Task<PageResult<Exchange>> GetExchangesByPageAsync(PageCriteria pageCriteria, CancellationToken cancellationToken = default);

/// <summary>
/// A list of all exchanges.
/// A list of all exchanges for a virtual host.
/// </summary>
/// <param name="vhostName"></param>
/// <param name="cancellationToken"></param>
Expand Down Expand Up @@ -147,6 +147,13 @@ Task<Channel> GetChannelAsync(
/// <returns></returns>
Task<PageResult<Queue>> GetQueuesByPageAsync(string vhostName, PageCriteria pageCriteria, CancellationToken cancellationToken = default);

/// <summary>
/// A list of all queues.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<IReadOnlyList<QueueWithoutStats>> GetQueuesWithoutStatsAsync(CancellationToken cancellationToken = default);

/// <summary>
/// A list of all bindings.
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions Source/EasyNetQ.Management.Client/ManagementClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public class ManagementClient : IManagementClient
private static readonly RelativePath Health = Api / "health";
private static readonly RelativePath Rebalance = Api / "rebalance";

private static readonly Dictionary<string, string> GetQueuesWithoutStatsQueryParameters = new Dictionary<string, string> {
{ "disable_stats", "true" },
{ "enable_queue_totals", "true" }
};

internal static readonly JsonSerializerOptions SerializerOptions;

private readonly HttpClient httpClient;
Expand Down Expand Up @@ -313,6 +318,11 @@ public Task<PageResult<Queue>> GetQueuesByPageAsync(string vhostName, PageCriter
return GetAsync<PageResult<Queue>>(Queues / vhostName, pageCriteria.ToQueryParameters(), cancellationToken);
}

public Task<IReadOnlyList<QueueWithoutStats>> GetQueuesWithoutStatsAsync(CancellationToken cancellationToken = default)
{
return GetAsync<IReadOnlyList<QueueWithoutStats>>(Queues, GetQueuesWithoutStatsQueryParameters, cancellationToken);
}

public Task CreateQueueAsync(
string vhostName,
QueueInfo queueInfo,
Expand Down
134 changes: 117 additions & 17 deletions Source/EasyNetQ.Management.Client/ManagementClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public static Overview GetOverview(
.GetResult();
}


/// <summary>
/// A list of nodes in the RabbitMQ cluster.
/// </summary>
Expand Down Expand Up @@ -74,6 +73,24 @@ public static IReadOnlyList<Connection> GetConnections(
.GetResult();
}

/// <summary>
/// A list of all open connections on the specified VHost.
/// </summary>
/// <param name="client"></param>
/// <param name="vhostName"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<Connection> GetConnections(
this IManagementClient client,
string vhostName,
CancellationToken cancellationToken = default
)
{
return client.GetConnectionsAsync(vhostName, cancellationToken)
.GetAwaiter()
.GetResult();
}

/// <summary>
/// A list of all open channels.
/// </summary>
Expand All @@ -91,7 +108,25 @@ public static IReadOnlyList<Channel> GetChannels(
}

/// <summary>
/// A list of all open channels.
/// A list of all open channels for the given connection.
/// </summary>
/// <param name="client"></param>
/// <param name="connectionName"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<Channel> GetChannels(
this IManagementClient client,
string connectionName,
CancellationToken cancellationToken = default
)
{
return client.GetChannelsAsync(connectionName, cancellationToken)
.GetAwaiter()
.GetResult();
}

/// <summary>
/// A list of all open channels for the given connection.
/// </summary>
/// <param name="client"></param>
/// <param name="connection"></param>
Expand All @@ -103,6 +138,24 @@ public static Task<IReadOnlyList<Channel>> GetChannelsAsync(
CancellationToken cancellationToken = default
) => client.GetChannelsAsync(connection.Name, cancellationToken);

/// <summary>
/// A list of all open channels for the given connection.
/// </summary>
/// <param name="client"></param>
/// <param name="connection"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<Channel> GetChannels(
this IManagementClient client,
Connection connection,
CancellationToken cancellationToken = default
)
{
return client.GetChannelsAsync(connection, cancellationToken)
.GetAwaiter()
.GetResult();
}

/// <summary>
/// Gets the channel. This returns more detail, including consumers than the GetChannels method.
/// </summary>
Expand Down Expand Up @@ -139,6 +192,24 @@ public static IReadOnlyList<Exchange> GetExchanges(
.GetResult();
}

/// <summary>
/// A list of all exchanges for a virtual host.
/// </summary>
/// <param name="client"></param>
/// <param name="vhostName"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<Exchange> GetExchanges(
this IManagementClient client,
string vhostName,
CancellationToken cancellationToken = default
)
{
return client.GetExchangesAsync(vhostName, cancellationToken)
.GetAwaiter()
.GetResult();
}

/// <summary>
/// A list of all queues.
/// </summary>
Expand All @@ -155,6 +226,24 @@ public static IReadOnlyList<Queue> GetQueues(
.GetResult();
}

/// <summary>
/// A list of all queues for a virtual host.
/// </summary>
/// <param name="client"></param>
/// <param name="vhostName"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<Queue> GetQueues(
this IManagementClient client,
string vhostName,
CancellationToken cancellationToken = default
)
{
return client.GetQueuesAsync(vhostName, cancellationToken)
.GetAwaiter()
.GetResult();
}

/// <summary>
/// A list of all queues for a virtual host.
/// </summary>
Expand Down Expand Up @@ -239,6 +328,17 @@ public static PageResult<Queue> GetQueuesByPage(
.GetResult();
}

/// <summary>
/// A list of all queues without stats.
/// </summary>
/// <param name="client"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<QueueWithoutStats> GetQueuesWithoutStats(
this IManagementClient client,
CancellationToken cancellationToken = default
) => client.GetQueuesWithoutStatsAsync(cancellationToken).GetAwaiter().GetResult();

/// <summary>
/// A list of all bindings.
/// </summary>
Expand Down Expand Up @@ -1658,13 +1758,13 @@ public static Parameter GetShovel(
string vhostName,
string shovelName,
CancellationToken cancellationToken = default
)
{
return client.GetParameterAsync(vhostName, "shovel", shovelName, cancellationToken)
.GetAwaiter()
.GetResult();
)
{
return client.GetParameterAsync(vhostName, "shovel", shovelName, cancellationToken)
.GetAwaiter()
.GetResult();
}


/// <summary>
/// Creates a federation upstream in a specific vhost
/// </summary>
Expand All @@ -1680,7 +1780,7 @@ public static Task CreateFederationUpstreamAsync(
ParameterFederationValue federationUpstreamDescription,
CancellationToken cancellationToken = default
) => client.CreateParameterAsync("federation-upstream", vhostName, federationUpstreamName, federationUpstreamDescription, cancellationToken);


/// <summary>
/// Creates a federation upstream in a specific vhost
/// </summary>
Expand All @@ -1695,11 +1795,11 @@ public static void CreateFederationUpstream(
string federationUpstreamName,
ParameterFederationValue federationUpstreamDescription,
CancellationToken cancellationToken = default
)
{
)
{
client.CreateParameterAsync("federation-upstream", vhostName, federationUpstreamName, federationUpstreamDescription, cancellationToken)
.GetAwaiter()
.GetResult();
.GetResult();
}

/// <summary>
Expand Down Expand Up @@ -1728,10 +1828,10 @@ public static Parameter GetFederationUpstream(
string vhostName,
string federationUpstreamName,
CancellationToken cancellationToken = default
)
{
return client.GetParameterAsync(vhostName, "federation-upstream", federationUpstreamName, cancellationToken)
.GetAwaiter()
.GetResult();
)
{
return client.GetParameterAsync(vhostName, "federation-upstream", federationUpstreamName, cancellationToken)
.GetAwaiter()
.GetResult();
}
}
8 changes: 4 additions & 4 deletions Source/EasyNetQ.Management.Client/Model/Connection.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Text.Json.Serialization;
using System.Text.Json.Serialization;
using EasyNetQ.Management.Client.Serialization;

namespace EasyNetQ.Management.Client.Model;
Expand All @@ -9,7 +9,7 @@ public record Connection(
long SendOct,
long SendCnt,
long SendPend,
string State,
string? State,
string? LastBlockedBy,
string? LastBlockedAge,
long Channels,
Expand All @@ -18,13 +18,13 @@ public record Connection(
string Name,
string? Address,
int Port,
string PeerHost,
string? PeerHost,
int PeerPort,
bool Ssl,
string? PeerCertSubject,
string? PeerCertIssuer,
string? PeerCertValidity,
string AuthMechanism,
string? AuthMechanism,
string? SslProtocol,
string? SslKeyExchange,
string? SslCipher,
Expand Down
6 changes: 5 additions & 1 deletion Source/EasyNetQ.Management.Client/Model/PolicyDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ public record PolicyDefinition

[property: JsonPropertyName("queue-mode")]
string? QueueMode = null
);
)
{
[JsonExtensionData()]
public Dictionary<string, object>? ExtensionData { get; set; }
};
Loading

0 comments on commit 490e21e

Please sign in to comment.