Skip to content

Commit 5cf3e25

Browse files
authored
Using sync version of subscribe in HybridCachingProvider (#455)
* fix: using sync version of subscribe in HybridCachingProvider(#421) * feat: enable SubscribeAsync
1 parent 3e321cc commit 5cf3e25

File tree

10 files changed

+130
-5
lines changed

10 files changed

+130
-5
lines changed

bus/EasyCaching.Bus.CSRedis/DefaultCSRedisBus.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,21 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
6464
);
6565
}
6666

67+
/// <summary>
68+
/// Subscribe the specified topic and action async.
69+
/// </summary>
70+
/// <param name="topic">Topic.</param>
71+
/// <param name="action">Action.</param>
72+
/// <param name="cancellationToken">Cancellation token.</param>
73+
public override Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
74+
{
75+
_client.Subscribe(
76+
(topic, msg => OnMessage(msg.Body))
77+
);
78+
return Task.CompletedTask;
79+
}
80+
81+
6782
/// <summary>
6883
/// Ons the message.
6984
/// </summary>

bus/EasyCaching.Bus.ConfluentKafka/DefaultConfluentKafkaBus.cs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
using Microsoft.Extensions.Options;
1313

1414
public class DefaultConfluentKafkaBus : EasyCachingAbstractBus
15-
{
15+
{
1616

1717

1818
/// <summary>
@@ -77,7 +77,7 @@ public override void BasePublish(string topic, EasyCachingMessage message)
7777
/// <param name="cancellationToken">Cancellation token.</param>
7878
public override async Task BasePublishAsync(string topic, EasyCachingMessage message, CancellationToken cancellationToken = default(CancellationToken))
7979
{
80-
var msg = _serializer.Serialize(message);
80+
var msg = _serializer.Serialize(message);
8181

8282
await _producer.ProduceAsync(topic, new Message<Null, byte[]> { Value = msg });
8383
}
@@ -89,7 +89,27 @@ public override void BasePublish(string topic, EasyCachingMessage message)
8989
/// <param name="action">Action.</param>
9090
public override void BaseSubscribe(string topic, Action<EasyCachingMessage> action)
9191
{
92-
Task.Factory.StartNew(() =>
92+
_ = StartConsumer(topic);
93+
}
94+
95+
/// <summary>
96+
/// Subscribe the specified topic and action async.
97+
/// </summary>
98+
/// <param name="topic">Topic.</param>
99+
/// <param name="action">Action.</param>
100+
/// <param name="cancellationToken">Cancellation token.</param>
101+
public override async Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
102+
{
103+
await StartConsumer(topic);
104+
}
105+
106+
/// <summary>
107+
/// Ons the consumer.
108+
/// </summary>
109+
/// <param name="topic">Topic</param>
110+
private Task StartConsumer(string topic)
111+
{
112+
return Task.Factory.StartNew(() =>
93113
{
94114
for (int i = 0; i < this._kafkaBusOptions.ConsumerCount; i++)
95115
{

bus/EasyCaching.Bus.RabbitMQ/DefaultRabbitMQBus.cs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ IPooledObjectPolicy<IModel> _objectPolicy
6565
RequestedConnectionTimeout = System.TimeSpan.FromMilliseconds(_options.RequestedConnectionTimeout),
6666
SocketReadTimeout = System.TimeSpan.FromMilliseconds(_options.SocketReadTimeout),
6767
SocketWriteTimeout = System.TimeSpan.FromMilliseconds(_options.SocketWriteTimeout),
68-
ClientProvidedName = _options.ClientProvidedName
68+
ClientProvidedName = _options.ClientProvidedName,
6969
};
7070

7171
_subConnection = factory.CreateConnection();
@@ -156,6 +156,28 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
156156
}
157157

158158

159+
/// <summary>
160+
/// Subscribe the specified topic and action async.
161+
/// </summary>
162+
/// <param name="topic">Topic.</param>
163+
/// <param name="action">Action.</param>
164+
/// <param name="cancellationToken">Cancellation token.</param>
165+
public override Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
166+
{
167+
var queueName = string.Empty;
168+
if (string.IsNullOrWhiteSpace(_options.QueueName))
169+
{
170+
queueName = $"rmq.queue.undurable.easycaching.subscriber.{_busId}";
171+
}
172+
else
173+
{
174+
queueName = _options.QueueName;
175+
}
176+
177+
StartConsumer(queueName, topic);
178+
return Task.CompletedTask;
179+
}
180+
159181
private void StartConsumer(string queueName, string topic)
160182
{
161183
var model = _subConnection.CreateModel();

bus/EasyCaching.Bus.Redis/DefaultRedisBus.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,5 +104,16 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
104104
{
105105
_subscriber.Subscribe(topic, OnMessage);
106106
}
107+
108+
/// <summary>
109+
/// Subscribe the specified topic and action async.
110+
/// </summary>
111+
/// <param name="topic">Topic.</param>
112+
/// <param name="action">Action.</param>
113+
/// <param name="cancellationToken">Cancellation token.</param>
114+
public override async Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
115+
{
116+
await _subscriber.SubscribeAsync(topic, OnMessage);
117+
}
107118
}
108119
}

bus/EasyCaching.Bus.Zookeeper/DefaultZookeeperBus.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,18 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
116116
}, TaskCreationOptions.LongRunning);
117117
}
118118

119+
/// <summary>
120+
/// Subscribe the specified topic and action async.
121+
/// </summary>
122+
/// <param name="topic">Topic.</param>
123+
/// <param name="action">Action.</param>
124+
/// <param name="cancellationToken">Cancellation token.</param>
125+
public override async Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
126+
{
127+
var path = $"{topic}";
128+
await SubscribeDataChangeAsync(path, SubscribeDataChange);
129+
}
130+
119131
/// <summary>
120132
/// Ons the message.
121133
/// </summary>

src/EasyCaching.Core/Bus/IEasyCachingSubscriber.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
namespace EasyCaching.Core.Bus
22
{
33
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
46

57
/// <summary>
68
/// EasyCaching subscriber.
@@ -14,5 +16,14 @@ public interface IEasyCachingSubscriber
1416
/// <param name="action">Action.</param>
1517
/// <param name="reconnectAction"> Reconnect Action.</param>
1618
void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null);
19+
20+
/// <summary>
21+
/// Subscribe the specified topic and action async.
22+
/// </summary>
23+
/// <param name="topic">Topic.</param>
24+
/// <param name="action">Action.</param>
25+
/// <param name="reconnectAction"> Reconnect Action.</param>
26+
/// <param name="cancellationToken">Cancellation token.</param>
27+
Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null, CancellationToken cancellationToken = default(CancellationToken));
1728
}
1829
}

src/EasyCaching.Core/Bus/NullEasyCachingBus.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,5 +59,17 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action, Action re
5959
{
6060

6161
}
62+
63+
/// <summary>
64+
/// Subscribe the specified topic and action async.
65+
/// </summary>
66+
/// <param name="topic">Topic.</param>
67+
/// <param name="action">Action.</param>
68+
/// <param name="reconnectAction">Reconnect Action.</param>
69+
/// <param name="cancellationToken">Cancellation token.</param>
70+
public Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null, CancellationToken cancellationToken = default(CancellationToken))
71+
{
72+
return Task.CompletedTask;
73+
}
6274
}
6375
}

src/EasyCaching.Core/EasyCachingAbstractBus.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public abstract class EasyCachingAbstractBus : IEasyCachingBus
1515
public abstract void BasePublish(string topic, EasyCachingMessage message);
1616
public abstract Task BasePublishAsync(string topic, EasyCachingMessage message, CancellationToken cancellationToken = default(CancellationToken));
1717
public abstract void BaseSubscribe(string topic, Action<EasyCachingMessage> action);
18+
public abstract Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken));
1819

1920
protected Action<EasyCachingMessage> _handler;
2021

@@ -83,6 +84,13 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action, Action re
8384
BaseSubscribe(topic, action);
8485
}
8586

87+
public async Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction, CancellationToken cancellationToken = default(CancellationToken))
88+
{
89+
_handler = action;
90+
_reconnectHandler = reconnectAction;
91+
await BaseSubscribeAsync(topic, action);
92+
}
93+
8694
public virtual void BaseOnMessage(EasyCachingMessage message)
8795
{
8896
var operationId = s_diagnosticListener.WriteSubscribeMessageBefore(new BeforeSubscribeMessageRequestEventData(message));

src/EasyCaching.HybridCache/HybridCachingProvider.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ string name
9393
else this._distributedCache = distributed;
9494

9595
this._bus = bus ?? NullEasyCachingBus.Instance;
96-
this._bus.Subscribe(_options.TopicName, OnMessage, OnReconnect);
96+
_ = SubscribeAsync();
9797

9898
this._cacheId = Guid.NewGuid().ToString("N");
9999

@@ -117,6 +117,15 @@ string name
117117
_busAsyncWrap = Policy.WrapAsync(fallbackAsyncPolicy, retryAsyncPolicy);
118118
}
119119

120+
/// <summary>
121+
/// Subscribe the topic
122+
/// </summary>
123+
/// <returns></returns>
124+
private async Task SubscribeAsync()
125+
{
126+
await _bus.SubscribeAsync(_options.TopicName, OnMessage, OnReconnect);
127+
}
128+
120129
/// <summary>
121130
/// Ons the message.
122131
/// </summary>

test/EasyCaching.UnitTests/Fake/FakeBus.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,10 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action, Action re
2323
{
2424

2525
}
26+
27+
public Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null, CancellationToken cancellationToken = default(CancellationToken))
28+
{
29+
return Task.CompletedTask;
30+
}
2631
}
2732
}

0 commit comments

Comments
 (0)