Skip to content

Commit

Permalink
Move from RabbitMQ to isc.rabbitmq package
Browse files Browse the repository at this point in the history
  • Loading branch information
eduard93 committed Sep 7, 2019
1 parent d016de5 commit b8499b1
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 183 deletions.
17 changes: 0 additions & 17 deletions RabbitMQ/Operation.cls

This file was deleted.

14 changes: 0 additions & 14 deletions RabbitMQ/Service.cls

This file was deleted.

60 changes: 7 additions & 53 deletions RabbitMQ/Common.cls → isc/rabbitmq/Common.cls
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
Class RabbitMQ.Common Extends %RegisteredObject [ Abstract ]
Class isc.rabbitmq.Common Extends %RegisteredObject [ Abstract ]
{

/// This is the ID name of the set of credentials values (Username, Password) to be used to access the HTTP server
/// Property Credentials As %String [ InitialExpression = "None" ];
Property Host As %String [ InitialExpression = "localhost" ];
/// Either host or Connection URI: amqp:\\
/// If Connection URI is passed, then Port, Credentials and Virtual host properties are ignored.
Property Host As %VarString [ InitialExpression = "localhost" ];

Property Port As %Integer [ InitialExpression = -1 ];

Property VirtualHost As %String [ InitialExpression = "/" ];

/// Outbound adapter only (ignored for inbound adapter)
/// If Exchange is not set, then Queue name.
/// If Exchange is set, then Routing key.
Property Queue As %String;

/// Exchange name. Optional, empty by default.
Property Exchange As %String;

/// Config Name of the Java Gateway service controlling the Java Gateway server this item will use.
/// Alternatively use JGHost and JGPort Settings, to specify Java gateway outside of Ensemble scope.
Property JGService As %String;
Expand Down Expand Up @@ -45,18 +39,8 @@ Property Encoding As %String;
/// See property AdditionalPaths in that class.
Property ClassPath As %String(MAXLEN = 32000);

/// How many times have we tried reconnecting
/// empty - do not retry
/// 0 - retry ad infinitum
/// n - retry n times
Property RetryCount As %Integer [ InitialExpression = 5 ];

/// How frequently to retry access to the output system.
/// Pause in seconds between retry attempts.
Property RetryInterval As %Numeric(MINVAL = 0) [ InitialExpression = 5 ];

/// These are the production settings for this object
Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic,RetryCount:Alerting,RetryInterval:Alerting";
Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic";

/// Connect to running JGW
Method Connect() As %Status
Expand Down Expand Up @@ -97,44 +81,14 @@ Method ConnectToRabbitMQ() As %Status
Set pass = "guest"
}

Set port = $select(..Port="":-1, 1:..Port)

Try {
Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, port, user, pass, ..VirtualHost, ..Queue, $$$YES, ..Exchange)
} Catch ex {
Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, ..Port, user, pass, ..VirtualHost, ..Queue, $$$YES)
} Catch ex {
Set sc = $$$ADDSC(ex.AsStatus(),$g(%objlasterror))
}

Quit sc
}

Method IsOpen() As %Status
{
#Dim sc As %Status = $$$OK
Set retryCount = 1
Try {
While '..API.isOpen() {
If ..RetryCount = "" {
Set sc = $$$ERROR($$$GeneralError, "Connection problems. Consider specifying RetryCount and RetryInterval settings")
} ElseIf ((..RetryCount = 0) || (retryCount < ..RetryCount)) {
// wait and retry connecting
Set retryCount = retryCount + 1
Hang ..RetryInterval

// reconnect happens in isOpen method
} Else {
// we're out of reconnect attempts
Set sc = $$$ERROR($$$GeneralError, $$$FormatText("Connection still closed after %1 attempts at reconnecting.", ..RetryCount))
}
Quit:$$$ISERR(sc)
}
} Catch ex {
#Dim ex As %Exception.General
Set sc = ex.AsStatus()
}

Quit sc
}

}

46 changes: 19 additions & 27 deletions RabbitMQ/InboundAdapter.cls → isc/rabbitmq/InboundAdapter.cls
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
Class RabbitMQ.InboundAdapter Extends (Ens.InboundAdapter, RabbitMQ.Common)
Class isc.rabbitmq.InboundAdapter Extends (Ens.InboundAdapter, isc.rabbitmq.Common)
{

/// Stream class to store message body. Leave empty to use strings.
Property BodyClass As %Dictionary.CacheClassname;

Parameter SETTINGS = "BodyClass:Basic";

ClassMethod BodyClassIsValid(val) As %Status
{
quit:val="" $$$OK
quit:val="%Stream.GlobalCharacter" $$$OK
quit $$$ERROR($$$GeneralError, "No")
}

/// Establish gateway connection and init java API.
Method OnInit() As %Status
{
Expand All @@ -28,43 +35,28 @@ Method OnTearDown() As %Status
/// Get Messages from RabbitMQ queue.
Method OnTask() As %Status
{
#Dim sc As %Status = $$$OK
Set sc = $$$OK

Set messageCount = 1

While messageCount > 0 {
Set sc = ..IsOpen()
Quit:$$$ISERR(sc)

// List containing metainformation and possibly body (in the case of string interaction) of the RabbitMQ message
#Dim messageList As %ListOfDataTypes

Set messageList = ##class(%ListOfDataTypes).%New()
For i=1:1:15 Do messageList.Insert("")

Try {
If ..BodyClass = "" {
Set messageList = ..API.readMessageString()
} Else {
#Dim tempStream As %Library.GlobalBinaryStream
Set tempStream = ..API.readMessageStream(.messageList)
}
} Catch ex {
#Dim ex As %Exception.General
If ($ZE["<ZJGTW>") {
Set sc = ..IsOpen()
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
} Else {
Set sc = ex.AsStatus()
}
Quit:$$$ISERR(sc)
If ..BodyClass = "" {
Set messageList = ..API.readMessageString()
} Else {
#Dim tempStream As %Library.GlobalBinaryStream
Set messageList = ##class(%ListOfDataTypes).%New()
For i=1:1:15 Do messageList.Insert("")
Set tempStream = ..API.readMessageStream(.messageList)
}

Set messageLength = messageList.GetAt(1)
Set messageCount = messageList.GetAt(2)

If messageLength>0 {
#Dim message As RabbitMQ.Message
#Dim message As isc.rabbitmq.Message
Set message = ..ListToMessage(messageList)
If ..BodyClass = "" {
Set message.BodyString = ..DecodeMessageBody(messageList.GetAt(16))
Expand All @@ -86,9 +78,9 @@ Method OnTask() As %Status
}

/// Convert list containing metainformation into RabbitMQ message
ClassMethod ListToMessage(list As %ListOfDataTypes) As RabbitMQ.Message
ClassMethod ListToMessage(list As %ListOfDataTypes) As isc.rabbitmq.Message
{
Set message = ##class(RabbitMQ.Message).%New()
Set message = ##class(isc.rabbitmq.Message).%New()

Set message.ContentType = list.GetAt(3)
Set message.ContentEncoding = list.GetAt(4)
Expand Down
10 changes: 5 additions & 5 deletions RabbitMQ/Message.cls → isc/rabbitmq/Message.cls
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Class RabbitMQ.Message Extends %Persistent
Class isc.rabbitmq.Message Extends %Persistent
{

Property ContentType As %String;
Expand Down Expand Up @@ -96,11 +96,11 @@ Storage Default
<Value>BodyStream</Value>
</Value>
</Data>
<DataLocation>^RabbitMQ.MessageD</DataLocation>
<DataLocation>^isc.rabbitmq.MessageD</DataLocation>
<DefaultData>MessageDefaultData</DefaultData>
<IdLocation>^RabbitMQ.MessageD</IdLocation>
<IndexLocation>^RabbitMQ.MessageI</IndexLocation>
<StreamLocation>^RabbitMQ.MessageS</StreamLocation>
<IdLocation>^isc.rabbitmq.MessageD</IdLocation>
<IndexLocation>^isc.rabbitmq.MessageI</IndexLocation>
<StreamLocation>^isc.rabbitmq.MessageS</StreamLocation>
<Type>%Library.CacheStorage</Type>
}

Expand Down
17 changes: 17 additions & 0 deletions isc/rabbitmq/Operation.cls
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Class isc.rabbitmq.Operation Extends Ens.BusinessOperation
{

Parameter ADAPTER = "isc.rabbitmq.OutboundAdapter";

Property Adapter As isc.rabbitmq.OutboundAdapter;

Method OnMessage(request As Ens.StringContainer, response As Ens.Response) As %Status
{
#Dim sc As %Status = $$$OK
Set response = ##class(Ens.Response).%New()
Set sc = ..Adapter.SendMessage(request.StringValue)
Quit sc
}

}

54 changes: 15 additions & 39 deletions RabbitMQ/OutboundAdapter.cls → isc/rabbitmq/OutboundAdapter.cls
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
Class RabbitMQ.OutboundAdapter Extends (Ens.OutboundAdapter, RabbitMQ.Common)
Class isc.rabbitmq.OutboundAdapter Extends (Ens.OutboundAdapter, isc.rabbitmq.Common)
{

Parameter SETTINGS = "Exchange:Basic";

/// Establish gateway connection and init java API.
Method OnInit() As %Status
{
Expand All @@ -29,30 +27,21 @@ Method SendMessage(message As %Stream.Object) As %Status
Set stream = ##class(%Library.GlobalBinaryStream).%New()

If $isObject(message) {
While 'message.AtEnd {
Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength)))
If message.%IsA("%Library.GlobalBinaryStream") {
Set stream = message
} Else {
While 'message.AtEnd {
Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength)))
}
}
} Else {
Do stream.Write(..EncodeMessageBody(message))
}

#Dim attempts As %Integer = 0

While attempts<2 {
Set attempts = attempts + 1
Try {
Do ..API.sendMessage(stream)
Return sc
} Catch ex {
#Dim ex As %Exception.General
If ($ZE["<ZJGTW>") {
Set sc = ..IsOpen()
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
} Else {
Set sc = ex.AsStatus()
}
Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception
}
Try {
Do ..API.sendMessage(stream)
} Catch ex {
Set sc = ex.AsStatus()
}
Quit sc
}
Expand All @@ -70,24 +59,11 @@ Method SendMessageToQueue(queue As %String, message As %Stream.Object) As %Statu
} Else {
Do stream.Write(..EncodeMessageBody(message))
}

#Dim attempts As %Integer = 0

While attempts<2 {
Set attempts = attempts + 1
Try {
Do ..API.sendMessageToQueue(queue, stream)
Return sc
} Catch ex {
#Dim ex As %Exception.General
If ($ZE["<ZJGTW>") {
Set sc = ..IsOpen()
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
} Else {
Set sc = ex.AsStatus()
}
Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception
}
Try {
Do ..API.sendMessageToQueue(queue, stream)
} Catch ex {
Set sc = ex.AsStatus()
}
Quit sc
}
Expand Down
14 changes: 7 additions & 7 deletions RabbitMQ/Production.cls → isc/rabbitmq/Production.cls
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
Class RabbitMQ.Production Extends Ens.Production
Class isc.rabbitmq.Production Extends Ens.Production
{

XData ProductionDefinition
{
<Production Name="RabbitMQ.Production" TestingEnabled="true" LogGeneralTraceEvents="false">
<Production Name="isc.rabbitmq.Production" TestingEnabled="true" LogGeneralTraceEvents="false">
<Description></Description>
<ActorPoolSize>2</ActorPoolSize>
<Item Name="RabbitMQ.Service" Category="" ClassName="RabbitMQ.Service" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="true" Schedule="">
<Item Name="isc.rabbitmq.Service" Category="" ClassName="RabbitMQ.Service" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="true" Schedule="">
<Setting Target="Adapter" Name="JGService">EnsLib.JavaGateway.Service</Setting>
<Setting Target="Adapter" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar</Setting>
<Setting Target="Adapter" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar;C:\InterSystems\Java\amqp-client-5.7.2.jar</Setting>
<Setting Target="Adapter" Name="Queue">hello</Setting>
<Setting Target="Adapter" Name="BodyClass"></Setting>
<Setting Target="Adapter" Name="Encoding">UTF8</Setting>
Expand All @@ -18,12 +18,12 @@ XData ProductionDefinition
<Setting Target="Host" Name="Port">55559</Setting>
<Setting Target="Host" Name="JDKVersion">JDK18</Setting>
<Setting Target="Host" Name="JavaHome">C:\Progra~1\Java\jdk1.8.0_121\</Setting>
<Setting Target="Host" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar</Setting>
<Setting Target="Host" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar;C:\InterSystems\Java\amqp-client-5.7.2.jar</Setting>
</Item>
<Item Name="RabbitMQ.Operation" Category="" ClassName="RabbitMQ.Operation" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
<Item Name="isc.rabbitmq.Operation" Category="" ClassName="RabbitMQ.Operation" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
<Setting Target="Adapter" Name="JGService">EnsLib.JavaGateway.Service</Setting>
<Setting Target="Adapter" Name="Queue">hello</Setting>
<Setting Target="Adapter" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar</Setting>
<Setting Target="Adapter" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar;C:\InterSystems\Java\amqp-client-5.7.2.jar</Setting>
<Setting Target="Adapter" Name="Encoding">UTF8</Setting>
</Item>
</Production>
Expand Down
14 changes: 14 additions & 0 deletions isc/rabbitmq/Service.cls
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Class isc.rabbitmq.Service Extends Ens.BusinessService
{

Parameter ADAPTER = "isc.rabbitmq.InboundAdapter";

Property Adapter As isc.rabbitmq.InboundAdapter;

Method OnProcessInput(message As isc.rabbitmq.Message) As %Status
{
quit message.%Save()
}

}

Loading

0 comments on commit b8499b1

Please sign in to comment.