-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathServiceBusMqServer.cs
193 lines (158 loc) · 6.33 KB
/
ServiceBusMqServer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
using ServiceStack.Messaging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ServiceStack.Text;
#if !NETFRAMEWORK
using Microsoft.Azure.ServiceBus.Management;
#else
using Microsoft.ServiceBus;
#endif
namespace ServiceStack.Azure.Messaging;
public class ServiceBusMqServer : IMessageService
{
private int retryCount = 1;
public int RetryCount
{
get => retryCount;
set
{
if (value < 0) throw new ArgumentOutOfRangeException(nameof(retryCount));
retryCount = value;
}
}
public ServiceBusMqServer(string connectionString)
{
messageFactory = new ServiceBusMqMessageFactory(this, connectionString);
}
private readonly ServiceBusMqMessageFactory? messageFactory;
public IMessageFactory MessageFactory => messageFactory ?? throw new ArgumentNullException(nameof(messageFactory));
public Func<string, IOneWayClient> ReplyClientFactory { get; set; }
/// <summary>
/// Execute global transformation or custom logic before a request is processed.
/// Must be thread-safe.
/// </summary>
public Func<IMessage, IMessage> RequestFilter { get; set; }
/// <summary>
/// Execute global transformation or custom logic on the response.
/// Must be thread-safe.
/// </summary>
public Func<object, object> ResponseFilter { get; set; }
#if !NETFRAMEWORK
/// <summary>
/// Exposes the <see cref="Microsoft.Azure.ServiceBus.Management.ManagementClient"/> which can be used to perform
/// management operations on ServiceBus entities.
/// </summary>
public ManagementClient ManagementClient => messageFactory!.managementClient;
#else
/// <summary>
/// Exposes the <see cref="Microsoft.ServiceBus.NamespaceManager"/> which can be used in managing entities,
/// such as queues, topics, subscriptions, and rules, in your service namespace.
/// </summary>
public NamespaceManager NamespaceManager => messageFactory.namespaceManager;
#endif
private readonly Dictionary<Type, IMessageHandlerFactory> handlerMap = new();
protected internal Dictionary<Type, IMessageHandlerFactory> HandlerMap => handlerMap;
private readonly Dictionary<Type, int> handlerThreadCountMap = new();
public List<Type> RegisteredTypes => handlerMap.Keys.ToList();
/// <summary>
/// Opt-in to only publish responses on this white list.
/// Publishes all responses by default.
/// </summary>
public string[]? PublishResponsesWhitelist { get; set; }
public bool DisablePublishingResponses
{
set => PublishResponsesWhitelist = value ? TypeConstants.EmptyStringArray : null;
}
/// <summary>
/// Opt-in to only publish .outq messages on this white list.
/// Publishes all responses by default.
/// </summary>
public string[]? PublishToOutqWhitelist { get; set; }
/// <summary>
/// Don't publish any messages to .outq
/// </summary>
public bool DisablePublishingToOutq
{
set => PublishToOutqWhitelist = value ? TypeConstants.EmptyStringArray : null;
}
/// <summary>
/// Disable publishing .outq Messages for Responses with no return type
/// </summary>
public bool DisableNotifyMessages { get; set; }
#if !NETFRAMEWORK
public Action<Microsoft.Azure.ServiceBus.Message,IMessage> PublishMessageFilter
{
get => messageFactory!.PublishMessageFilter;
set => messageFactory!.PublishMessageFilter = value;
}
#else
public Action<Microsoft.ServiceBus.Messaging.BrokeredMessage,IMessage> PublishMessageFilter
{
get => messageFactory.PublishMessageFilter;
set => messageFactory.PublishMessageFilter = value;
}
#endif
public void Dispose()
{
(MessageFactory as ServiceBusMqMessageFactory)?.StopQueues();
}
public IMessageHandlerStats GetStats()
{
return ServiceBusMessageFactory.GetStats();
}
public string GetStatus()
{
return ServiceBusMessageFactory.GetStatus();
}
public string GetStatsDescription()
{
return ServiceBusMessageFactory.GetStatsDescription();
}
public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn)
{
RegisterHandler(processMessageFn, null, noOfThreads: 1);
}
public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, int noOfThreads)
{
RegisterHandler(processMessageFn, null, noOfThreads);
}
public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, Action<IMessageHandler, IMessage<T>, Exception> processExceptionEx)
{
RegisterHandler(processMessageFn, processExceptionEx, noOfThreads: 1);
}
public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, Action<IMessageHandler, IMessage<T>, Exception>? processExceptionEx, int noOfThreads)
{
if (handlerMap.ContainsKey(typeof(T)))
{
throw new ArgumentException("Message handler has already been registered for type: " + typeof(T).Name);
}
handlerMap[typeof(T)] = CreateMessageHandlerFactory(processMessageFn, processExceptionEx);
handlerThreadCountMap[typeof(T)] = noOfThreads;
LicenseUtils.AssertValidUsage(LicenseFeature.ServiceStack, QuotaType.Operations, handlerMap.Count);
}
protected IMessageHandlerFactory CreateMessageHandlerFactory<T>(Func<IMessage<T>, object> processMessageFn, Action<IMessageHandler, IMessage<T>, Exception>? processExceptionEx)
{
return new MessageHandlerFactory<T>(this, processMessageFn, processExceptionEx)
{
RequestFilter = RequestFilter,
ResponseFilter = ResponseFilter,
RetryCount = RetryCount,
PublishResponsesWhitelist = PublishResponsesWhitelist,
PublishToOutqWhitelist = PublishToOutqWhitelist,
};
}
private ServiceBusMqMessageFactory ServiceBusMessageFactory => (ServiceBusMqMessageFactory)MessageFactory;
public void Start()
{
// Create the queues (if they don't exist) and start the listeners
ServiceBusMessageFactory.StartQueues(this.handlerMap, this.handlerThreadCountMap);
}
public void Stop()
{
ServiceBusMessageFactory.StopQueues();
}
}