Skip to content

Commit a918dca

Browse files
committed
连接池实现连接和通道的复用。但还没有实现消费者绑定队列获取消息的功能。
1 parent 49d02d5 commit a918dca

12 files changed

+713
-0
lines changed

Abtraction/AbractPool.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using Microsoft.Extensions.Logging;
2+
using System;
3+
using System.Collections.Concurrent;
4+
using System.Collections.Generic;
5+
using System.Text;
6+
7+
namespace RabbitMq.Abtraction
8+
{
9+
public abstract class AbractPool<T> : IPool<T> where T : IPoolable
10+
{
11+
protected readonly ConcurrentQueue<T> _pool = new ConcurrentQueue<T>();
12+
13+
protected int _maxSize;
14+
15+
protected int _createdCount;
16+
17+
protected bool _disposed;
18+
19+
protected ILogger _logger;
20+
21+
protected virtual int Count { get => _pool.Count; }
22+
23+
public virtual void Dispose()
24+
{
25+
if (!_pool.IsEmpty)
26+
{
27+
while (_pool.TryDequeue(out T item))
28+
{
29+
item?.Dispose();
30+
}
31+
}
32+
}
33+
34+
public abstract T Rent();
35+
36+
public abstract void Return(T t);
37+
}
38+
}

Abtraction/IPool.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace RabbitMq.Abtraction
6+
{
7+
public interface IPool<T> : IDisposable where T : IDisposable
8+
{
9+
T Rent();
10+
11+
void Return(T t);
12+
}
13+
}

Abtraction/IPoolable.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace RabbitMq.Abtraction
6+
{
7+
public interface IPoolable : IDisposable
8+
{
9+
void SetPool(object pool);
10+
}
11+
}

Events/ConfigEventArgs.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace RabbitMq.Events
6+
{
7+
public class ConfigEventArgs : EventArgs
8+
{
9+
public RabbitMqOptions Options { get; set; }
10+
}
11+
}

Extensions/ConnectionExtension.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using Microsoft.Extensions.Logging;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
7+
namespace RabbitMq.Extensions
8+
{
9+
public static class ConnectionExtension
10+
{
11+
public static Task SendMessageAsync(this RabbitMqConnection connection, string exchangeName, string routeKey, byte[] message)
12+
{
13+
return Task.Run(() =>
14+
{
15+
var channel = connection.Rent();
16+
if (!channel.SendMessage(exchangeName, routeKey, message))
17+
{
18+
connection.Logger.LogError("send message failed");
19+
}
20+
connection.Return(channel);
21+
});
22+
23+
}
24+
}
25+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using Microsoft.Extensions.Configuration;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Text;
6+
7+
namespace RabbitMq.DependencyInjection
8+
{
9+
public static class DependencyInjectionExtension
10+
{
11+
public static IServiceCollection AddRabbitMqHelper(this IServiceCollection services, Action<RabbitMqOptions> build, IConfiguration configuration)
12+
{
13+
var options = new RabbitMqOptions();
14+
options.Configuration = configuration;
15+
build(options);
16+
17+
services.AddSingleton(options);
18+
services.AddSingleton<RabbitMqConnectionTable>();
19+
services.AddSingleton<RabbitMqHelper>();
20+
21+
return services;
22+
}
23+
}
24+
}

RabbitMq.csproj

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netcoreapp3.1</TargetFramework>
5+
</PropertyGroup>
6+
7+
<ItemGroup>
8+
<PackageReference Include="RabbitMQ.Client" Version="5.1.2" />
9+
</ItemGroup>
10+
11+
<ItemGroup>
12+
<ProjectReference Include="..\Core\Core.csproj" />
13+
</ItemGroup>
14+
15+
</Project>

RabbitMqChannel.cs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
using Core.Extensions;
2+
using Microsoft.Extensions.Logging;
3+
using RabbitMq.Abtraction;
4+
using RabbitMq.Events;
5+
using RabbitMQ.Client;
6+
using System;
7+
using System.Collections.Generic;
8+
using System.Text;
9+
using System.Threading.Tasks;
10+
11+
namespace RabbitMq
12+
{
13+
public class RabbitMqChannel : IDisposable, IPoolable
14+
{
15+
private RabbitMqConnection _connection;
16+
17+
private IModel _channel;
18+
19+
private bool _disposed;
20+
21+
private readonly RabbitMqOptions _options;
22+
23+
private readonly ILogger _logger;
24+
25+
public IModel Channel { get => _channel; }
26+
/// <summary>
27+
/// 通道配置的事件回调。可以由用户自己配置再进行与mq服务的消息传递。
28+
/// 调用<see cref="HandleChannelConfig(RabbitMqOptions)"/>使配置生效。
29+
/// </summary>
30+
public EventHandler<ConfigEventArgs> ChannelConfiged { get; set; }
31+
32+
public bool Configed { get; private set; }
33+
34+
public RabbitMqChannel(RabbitMqConnection connection)
35+
{
36+
Check.NotNull(connection, nameof(connection));
37+
38+
_logger = RabbitMqConnectionTable.LoggerFactory.CreateLogger(GetType());
39+
_channel = connection.Connection?.CreateModel();
40+
_options = connection.Options;
41+
SetPool(connection);
42+
}
43+
44+
public Task<bool> SendAndDeleteAsync(Exchange exchange, Queue queue, byte[] body, IBasicProperties properties = default, bool mandatory = default)
45+
{
46+
if (Channel.IsOpen)
47+
{
48+
Channel.ExchangeDeclare(exchange.Name, exchange.Type, exchange.Durable, exchange.AutoDelete, null);
49+
Channel.QueueDeclare(queue.Name, queue.Durable, queue.Exclusive, queue.AutoDelete, null);
50+
Channel.QueueBind(queue.Name, exchange.Name, exchange.RouteKey, null);
51+
Channel.BasicPublish(exchange.Name, exchange.RouteKey, mandatory, properties, body);
52+
return Task.Run(() =>
53+
{
54+
55+
var result = Channel.WaitForConfirms();
56+
Channel.QueueDelete(queue.Name, true, true);
57+
Channel.ExchangeDelete(exchange.Name, true);
58+
return result;
59+
});
60+
}
61+
else
62+
{
63+
_logger.LogInformation("class-id:{0}\nreply-code:{1}\nreply-text:{2}", Channel.CloseReason.ClassId, Channel.CloseReason.ReplyCode, Channel.CloseReason.ReplyText);
64+
return Task.FromResult(false);
65+
}
66+
}
67+
68+
public bool SendMessage(string exchangeName, string routeKey, byte[] body,IBasicProperties properties = default, bool mandatory = default)
69+
{
70+
if (Channel.IsOpen)
71+
{
72+
Channel.BasicPublish(exchangeName, routeKey, mandatory, properties, body);
73+
return Channel.WaitForConfirms();
74+
}
75+
else
76+
{
77+
_logger.LogInformation("class-id:{0}\nreply-code:{1}\nreply-text:{2}", Channel.CloseReason.ClassId, Channel.CloseReason.ReplyCode, Channel.CloseReason.ReplyText);
78+
return false;
79+
}
80+
}
81+
82+
public virtual void Dispose()
83+
{
84+
if (!_disposed)
85+
{
86+
_channel?.Close();
87+
_channel = null;
88+
_connection = null;
89+
_disposed = true;
90+
}
91+
}
92+
93+
public virtual void SetPool(object pool)
94+
{
95+
Check.NotNull(pool, nameof(pool));
96+
Check.IsType<RabbitMqConnection>(pool, nameof(pool));
97+
98+
_connection = pool as RabbitMqConnection;
99+
}
100+
101+
/// <summary>
102+
/// 执行通道(channel)通道配置事件。调用事件<see cref="ChannelConfiged"/>,并设置<see cref="Configed"/>为true
103+
/// </summary>
104+
/// <param name="options">配置选项</param>
105+
/// <seealso cref="ChannelConfiged"/>
106+
public void HandleChannelConfig(RabbitMqOptions options)
107+
{
108+
Check.NotNull(options, nameof(options));
109+
110+
var source = new ConfigEventArgs()
111+
{
112+
Options = options
113+
};
114+
115+
if (ChannelConfiged != null)
116+
{
117+
ChannelConfiged(this, source);
118+
Configed = true;
119+
}
120+
}
121+
}
122+
}

RabbitMqConnection.cs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
using Core.Extensions;
2+
using Microsoft.Extensions.Logging;
3+
using RabbitMq.Abtraction;
4+
using RabbitMQ.Client;
5+
using RabbitMQ.Client.Framing.Impl;
6+
using System;
7+
using System.Collections.Concurrent;
8+
using System.Collections.Generic;
9+
using System.Linq;
10+
using System.Text;
11+
using System.Threading;
12+
using System.Threading.Tasks;
13+
using static RabbitMq.RabbitMqConnectionTable;
14+
15+
namespace RabbitMq
16+
{
17+
public class RabbitMqConnection : AbractPool<RabbitMqChannel>, IPoolable
18+
{
19+
private IConnection _connection;
20+
21+
private ConnectionPool _connectionPool;
22+
23+
private readonly RabbitMqOptions _options;
24+
25+
public RabbitMqOptions Options { get => _options; }
26+
27+
public IConnection Connection { get => _connection; }
28+
29+
public ILogger Logger { get => _logger; }
30+
31+
public RabbitMqConnection(ConnectionPool parentPool, ushort channleMax = 50)
32+
{
33+
Check.NotNull(parentPool, nameof(parentPool));
34+
35+
_options = parentPool.Options;
36+
_connection = parentPool.ConnectionFactory?.CreateConnection() ?? throw new ArgumentNullException(nameof(parentPool));
37+
_logger = RabbitMqConnectionTable.LoggerFactory.CreateLogger(GetType());
38+
_maxSize = channleMax;
39+
SetPool(parentPool);
40+
}
41+
42+
public RabbitMqChannel CreateChannel()
43+
{
44+
var channel = new RabbitMqChannel(this);
45+
return channel;
46+
}
47+
48+
public void ExchangesDeclare(string serverName)
49+
{
50+
var channel = Rent();
51+
var model = channel.Channel;
52+
foreach (var exchange in _options.Exchanges.Where(e => e.ServerName == serverName))
53+
{
54+
model.ExchangeDeclare(exchange.Name, exchange.Type, exchange.Durable, exchange.AutoDelete, exchange.Arguements);
55+
foreach (var queue in exchange.BindedQueues)
56+
{
57+
model.QueueDeclare(queue.Name, queue.Durable, queue.Exclusive, queue.AutoDelete, queue.Arguements);
58+
model.QueueBind(queue.Name, exchange.Name, exchange.RouteKey, exchange.Arguements);
59+
}
60+
}
61+
Return(channel);
62+
}
63+
64+
public void QueuesDeclare(string serverName)
65+
{
66+
var channel = Rent();
67+
var model = channel.Channel;
68+
foreach (var queue in _options.Queues.Where(q => q.ServerName == serverName))
69+
{
70+
model.QueueDeclare(queue.Name, queue.Durable, queue.Exclusive, queue.AutoDelete, queue.Arguements);
71+
}
72+
Return(channel);
73+
}
74+
75+
public override RabbitMqChannel Rent()
76+
{
77+
if (_pool.TryDequeue(out RabbitMqChannel channel))
78+
{
79+
return channel;
80+
}
81+
82+
else if (Interlocked.Increment(ref _createdCount) <= _maxSize)
83+
{
84+
channel = CreateChannel();
85+
channel.SetPool(this);
86+
return channel;
87+
}
88+
89+
else
90+
{
91+
Interlocked.Decrement(ref _createdCount);
92+
return default;
93+
}
94+
}
95+
96+
public override void Return(RabbitMqChannel model)
97+
{
98+
int currentCount = Count;
99+
if (Interlocked.Increment(ref currentCount) <= _maxSize)
100+
{
101+
_pool.Enqueue(model);
102+
}
103+
else
104+
{
105+
throw new ArgumentOutOfRangeException(nameof(model));
106+
}
107+
}
108+
109+
public override void Dispose()
110+
{
111+
if (!_disposed)
112+
{
113+
base.Dispose();
114+
_connection?.Close();
115+
116+
_connection = null;
117+
_disposed = true;
118+
}
119+
}
120+
121+
public virtual void SetPool(object pool)
122+
{
123+
Check.NotNull(pool, nameof(pool));
124+
Check.IsType<ConnectionPool>(pool, nameof(pool));
125+
126+
_connectionPool = pool as ConnectionPool;
127+
}
128+
}
129+
}

0 commit comments

Comments
 (0)