-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRabbitMqConnection.cs
129 lines (110 loc) · 3.92 KB
/
RabbitMqConnection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
using Core.Extensions;
using Microsoft.Extensions.Logging;
using RabbitMq.Abtraction;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing.Impl;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static RabbitMq.RabbitMqConnectionTable;
namespace RabbitMq
{
public class RabbitMqConnection : AbractPool<RabbitMqChannel>, IPoolable
{
private IConnection _connection;
private ConnectionPool _connectionPool;
private readonly RabbitMqOptions _options;
public RabbitMqOptions Options { get => _options; }
public IConnection Connection { get => _connection; }
public ILogger Logger { get => _logger; }
public RabbitMqConnection(ConnectionPool parentPool, ushort channleMax = 50)
{
Check.NotNull(parentPool, nameof(parentPool));
_options = parentPool.Options;
_connection = parentPool.ConnectionFactory?.CreateConnection() ?? throw new ArgumentNullException(nameof(parentPool));
_logger = RabbitMqConnectionTable.LoggerFactory.CreateLogger(GetType());
_maxSize = channleMax;
SetPool(parentPool);
}
public RabbitMqChannel CreateChannel()
{
var channel = new RabbitMqChannel(this);
return channel;
}
public void ExchangesDeclare(string serverName)
{
var channel = Rent();
var model = channel.Channel;
foreach (var exchange in _options.Exchanges.Where(e => e.ServerName == serverName))
{
model.ExchangeDeclare(exchange.Name, exchange.Type, exchange.Durable, exchange.AutoDelete, exchange.Arguements);
foreach (var queue in exchange.BindedQueues)
{
model.QueueDeclare(queue.Name, queue.Durable, queue.Exclusive, queue.AutoDelete, queue.Arguements);
model.QueueBind(queue.Name, exchange.Name, exchange.RouteKey, exchange.Arguements);
}
}
Return(channel);
}
public void QueuesDeclare(string serverName)
{
var channel = Rent();
var model = channel.Channel;
foreach (var queue in _options.Queues.Where(q => q.ServerName == serverName))
{
model.QueueDeclare(queue.Name, queue.Durable, queue.Exclusive, queue.AutoDelete, queue.Arguements);
}
Return(channel);
}
public override RabbitMqChannel Rent()
{
if (_pool.TryDequeue(out RabbitMqChannel channel))
{
return channel;
}
else if (Interlocked.Increment(ref _createdCount) <= _maxSize)
{
channel = CreateChannel();
channel.SetPool(this);
return channel;
}
else
{
Interlocked.Decrement(ref _createdCount);
return default;
}
}
public override void Return(RabbitMqChannel model)
{
int currentCount = Count;
if (Interlocked.Increment(ref currentCount) <= _maxSize)
{
_pool.Enqueue(model);
}
else
{
throw new ArgumentOutOfRangeException(nameof(model));
}
}
public override void Dispose()
{
if (!_disposed)
{
base.Dispose();
_connection?.Close();
_connection = null;
_disposed = true;
}
}
public virtual void SetPool(object pool)
{
Check.NotNull(pool, nameof(pool));
Check.IsType<ConnectionPool>(pool, nameof(pool));
_connectionPool = pool as ConnectionPool;
}
}
}