diff --git a/RabbitMQ/Operation.cls b/RabbitMQ/Operation.cls deleted file mode 100644 index e1c8c19..0000000 --- a/RabbitMQ/Operation.cls +++ /dev/null @@ -1,17 +0,0 @@ -Class RabbitMQ.Operation Extends Ens.BusinessOperation -{ - -Parameter ADAPTER = "RabbitMQ.OutboundAdapter"; - -Property Adapter As RabbitMQ.OutboundAdapter; - -Method OnMessage(request As Ens.StringRequest, response As Ens.Response) As %Status -{ - #Dim sc As %Status = $$$OK - Set response = ##class(Ens.Response).%New() - Set sc = ..Adapter.SendMessage(request.StringValue) - Quit sc -} - -} - diff --git a/RabbitMQ/Service.cls b/RabbitMQ/Service.cls deleted file mode 100644 index f0e8cb6..0000000 --- a/RabbitMQ/Service.cls +++ /dev/null @@ -1,14 +0,0 @@ -Class RabbitMQ.Service Extends Ens.BusinessService -{ - -Parameter ADAPTER = "RabbitMQ.InboundAdapter"; - -Property Adapter As RabbitMQ.InboundAdapter; - -Method OnProcessInput(message As RabbitMQ.Message) As %Status -{ - quit message.%Save() -} - -} - diff --git a/RabbitMQ/Common.cls b/isc/rabbitmq/Common.cls similarity index 58% rename from RabbitMQ/Common.cls rename to isc/rabbitmq/Common.cls index 5315475..25dabf8 100644 --- a/RabbitMQ/Common.cls +++ b/isc/rabbitmq/Common.cls @@ -1,22 +1,16 @@ -Class RabbitMQ.Common Extends %RegisteredObject [ Abstract ] +Class isc.rabbitmq.Common Extends %RegisteredObject [ Abstract ] { -/// This is the ID name of the set of credentials values (Username, Password) to be used to access the HTTP server -/// Property Credentials As %String [ InitialExpression = "None" ]; -Property Host As %String [ InitialExpression = "localhost" ]; +/// Either host or Connection URI: amqp:\\ +/// If Connection URI is passed, then Port, Credentials and Virtual host properties are ignored. +Property Host As %VarString [ InitialExpression = "localhost" ]; Property Port As %Integer [ InitialExpression = -1 ]; Property VirtualHost As %String [ InitialExpression = "/" ]; -/// Outbound adapter only (ignored for inbound adapter) -/// If Exchange is not set, then Queue name. -/// If Exchange is set, then Routing key. Property Queue As %String; -/// Exchange name. Optional, empty by default. -Property Exchange As %String; - /// Config Name of the Java Gateway service controlling the Java Gateway server this item will use. /// Alternatively use JGHost and JGPort Settings, to specify Java gateway outside of Ensemble scope. Property JGService As %String; @@ -45,18 +39,8 @@ Property Encoding As %String; /// See property AdditionalPaths in that class. Property ClassPath As %String(MAXLEN = 32000); -/// How many times have we tried reconnecting -/// empty - do not retry -/// 0 - retry ad infinitum -/// n - retry n times -Property RetryCount As %Integer [ InitialExpression = 5 ]; - -/// How frequently to retry access to the output system. -/// Pause in seconds between retry attempts. -Property RetryInterval As %Numeric(MINVAL = 0) [ InitialExpression = 5 ]; - /// These are the production settings for this object -Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic,RetryCount:Alerting,RetryInterval:Alerting"; +Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic"; /// Connect to running JGW Method Connect() As %Status @@ -97,44 +81,14 @@ Method ConnectToRabbitMQ() As %Status Set pass = "guest" } - Set port = $select(..Port="":-1, 1:..Port) - Try { - Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, port, user, pass, ..VirtualHost, ..Queue, $$$YES, ..Exchange) - } Catch ex { + Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, ..Port, user, pass, ..VirtualHost, ..Queue, $$$YES) + } Catch ex { Set sc = $$$ADDSC(ex.AsStatus(),$g(%objlasterror)) } Quit sc } -Method IsOpen() As %Status -{ - #Dim sc As %Status = $$$OK - Set retryCount = 1 - Try { - While '..API.isOpen() { - If ..RetryCount = "" { - Set sc = $$$ERROR($$$GeneralError, "Connection problems. Consider specifying RetryCount and RetryInterval settings") - } ElseIf ((..RetryCount = 0) || (retryCount < ..RetryCount)) { - // wait and retry connecting - Set retryCount = retryCount + 1 - Hang ..RetryInterval - - // reconnect happens in isOpen method - } Else { - // we're out of reconnect attempts - Set sc = $$$ERROR($$$GeneralError, $$$FormatText("Connection still closed after %1 attempts at reconnecting.", ..RetryCount)) - } - Quit:$$$ISERR(sc) - } - } Catch ex { - #Dim ex As %Exception.General - Set sc = ex.AsStatus() - } - - Quit sc -} - } diff --git a/RabbitMQ/InboundAdapter.cls b/isc/rabbitmq/InboundAdapter.cls similarity index 75% rename from RabbitMQ/InboundAdapter.cls rename to isc/rabbitmq/InboundAdapter.cls index eae0227..f8a367e 100644 --- a/RabbitMQ/InboundAdapter.cls +++ b/isc/rabbitmq/InboundAdapter.cls @@ -1,4 +1,4 @@ -Class RabbitMQ.InboundAdapter Extends (Ens.InboundAdapter, RabbitMQ.Common) +Class isc.rabbitmq.InboundAdapter Extends (Ens.InboundAdapter, isc.rabbitmq.Common) { /// Stream class to store message body. Leave empty to use strings. @@ -6,6 +6,13 @@ Property BodyClass As %Dictionary.CacheClassname; Parameter SETTINGS = "BodyClass:Basic"; +ClassMethod BodyClassIsValid(val) As %Status +{ + quit:val="" $$$OK + quit:val="%Stream.GlobalCharacter" $$$OK + quit $$$ERROR($$$GeneralError, "No") +} + /// Establish gateway connection and init java API. Method OnInit() As %Status { @@ -28,43 +35,28 @@ Method OnTearDown() As %Status /// Get Messages from RabbitMQ queue. Method OnTask() As %Status { - #Dim sc As %Status = $$$OK + Set sc = $$$OK Set messageCount = 1 While messageCount > 0 { - Set sc = ..IsOpen() - Quit:$$$ISERR(sc) - // List containing metainformation and possibly body (in the case of string interaction) of the RabbitMQ message #Dim messageList As %ListOfDataTypes - Set messageList = ##class(%ListOfDataTypes).%New() - For i=1:1:15 Do messageList.Insert("") - - Try { - If ..BodyClass = "" { - Set messageList = ..API.readMessageString() - } Else { - #Dim tempStream As %Library.GlobalBinaryStream - Set tempStream = ..API.readMessageStream(.messageList) - } - } Catch ex { - #Dim ex As %Exception.General - If ($ZE["") { - Set sc = ..IsOpen() - Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus()) - } Else { - Set sc = ex.AsStatus() - } - Quit:$$$ISERR(sc) + If ..BodyClass = "" { + Set messageList = ..API.readMessageString() + } Else { + #Dim tempStream As %Library.GlobalBinaryStream + Set messageList = ##class(%ListOfDataTypes).%New() + For i=1:1:15 Do messageList.Insert("") + Set tempStream = ..API.readMessageStream(.messageList) } Set messageLength = messageList.GetAt(1) Set messageCount = messageList.GetAt(2) If messageLength>0 { - #Dim message As RabbitMQ.Message + #Dim message As isc.rabbitmq.Message Set message = ..ListToMessage(messageList) If ..BodyClass = "" { Set message.BodyString = ..DecodeMessageBody(messageList.GetAt(16)) @@ -86,9 +78,9 @@ Method OnTask() As %Status } /// Convert list containing metainformation into RabbitMQ message -ClassMethod ListToMessage(list As %ListOfDataTypes) As RabbitMQ.Message +ClassMethod ListToMessage(list As %ListOfDataTypes) As isc.rabbitmq.Message { - Set message = ##class(RabbitMQ.Message).%New() + Set message = ##class(isc.rabbitmq.Message).%New() Set message.ContentType = list.GetAt(3) Set message.ContentEncoding = list.GetAt(4) diff --git a/RabbitMQ/Message.cls b/isc/rabbitmq/Message.cls similarity index 87% rename from RabbitMQ/Message.cls rename to isc/rabbitmq/Message.cls index a6f6bb1..c08e47f 100644 --- a/RabbitMQ/Message.cls +++ b/isc/rabbitmq/Message.cls @@ -1,4 +1,4 @@ -Class RabbitMQ.Message Extends %Persistent +Class isc.rabbitmq.Message Extends %Persistent { Property ContentType As %String; @@ -96,11 +96,11 @@ Storage Default BodyStream -^RabbitMQ.MessageD +^isc.rabbitmq.MessageD MessageDefaultData -^RabbitMQ.MessageD -^RabbitMQ.MessageI -^RabbitMQ.MessageS +^isc.rabbitmq.MessageD +^isc.rabbitmq.MessageI +^isc.rabbitmq.MessageS %Library.CacheStorage } diff --git a/isc/rabbitmq/Operation.cls b/isc/rabbitmq/Operation.cls new file mode 100644 index 0000000..d913f72 --- /dev/null +++ b/isc/rabbitmq/Operation.cls @@ -0,0 +1,17 @@ +Class isc.rabbitmq.Operation Extends Ens.BusinessOperation +{ + +Parameter ADAPTER = "isc.rabbitmq.OutboundAdapter"; + +Property Adapter As isc.rabbitmq.OutboundAdapter; + +Method OnMessage(request As Ens.StringContainer, response As Ens.Response) As %Status +{ + #Dim sc As %Status = $$$OK + Set response = ##class(Ens.Response).%New() + Set sc = ..Adapter.SendMessage(request.StringValue) + Quit sc +} + +} + diff --git a/RabbitMQ/OutboundAdapter.cls b/isc/rabbitmq/OutboundAdapter.cls similarity index 58% rename from RabbitMQ/OutboundAdapter.cls rename to isc/rabbitmq/OutboundAdapter.cls index 539e567..6e3d002 100644 --- a/RabbitMQ/OutboundAdapter.cls +++ b/isc/rabbitmq/OutboundAdapter.cls @@ -1,8 +1,6 @@ -Class RabbitMQ.OutboundAdapter Extends (Ens.OutboundAdapter, RabbitMQ.Common) +Class isc.rabbitmq.OutboundAdapter Extends (Ens.OutboundAdapter, isc.rabbitmq.Common) { -Parameter SETTINGS = "Exchange:Basic"; - /// Establish gateway connection and init java API. Method OnInit() As %Status { @@ -29,30 +27,21 @@ Method SendMessage(message As %Stream.Object) As %Status Set stream = ##class(%Library.GlobalBinaryStream).%New() If $isObject(message) { - While 'message.AtEnd { - Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength))) + If message.%IsA("%Library.GlobalBinaryStream") { + Set stream = message + } Else { + While 'message.AtEnd { + Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength))) + } } } Else { Do stream.Write(..EncodeMessageBody(message)) } - #Dim attempts As %Integer = 0 - - While attempts<2 { - Set attempts = attempts + 1 - Try { - Do ..API.sendMessage(stream) - Return sc - } Catch ex { - #Dim ex As %Exception.General - If ($ZE["") { - Set sc = ..IsOpen() - Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus()) - } Else { - Set sc = ex.AsStatus() - } - Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception - } + Try { + Do ..API.sendMessage(stream) + } Catch ex { + Set sc = ex.AsStatus() } Quit sc } @@ -70,24 +59,11 @@ Method SendMessageToQueue(queue As %String, message As %Stream.Object) As %Statu } Else { Do stream.Write(..EncodeMessageBody(message)) } - - #Dim attempts As %Integer = 0 - While attempts<2 { - Set attempts = attempts + 1 - Try { - Do ..API.sendMessageToQueue(queue, stream) - Return sc - } Catch ex { - #Dim ex As %Exception.General - If ($ZE["") { - Set sc = ..IsOpen() - Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus()) - } Else { - Set sc = ex.AsStatus() - } - Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception - } + Try { + Do ..API.sendMessageToQueue(queue, stream) + } Catch ex { + Set sc = ex.AsStatus() } Quit sc } diff --git a/RabbitMQ/Production.cls b/isc/rabbitmq/Production.cls similarity index 64% rename from RabbitMQ/Production.cls rename to isc/rabbitmq/Production.cls index e3c74ad..95a754f 100644 --- a/RabbitMQ/Production.cls +++ b/isc/rabbitmq/Production.cls @@ -1,14 +1,14 @@ -Class RabbitMQ.Production Extends Ens.Production +Class isc.rabbitmq.Production Extends Ens.Production { XData ProductionDefinition { - + 2 - + EnsLib.JavaGateway.Service - C:\InterSystems\RabbitMQjava.jar + C:\InterSystems\RabbitMQjava.jar;C:\InterSystems\Java\amqp-client-5.7.2.jar hello UTF8 @@ -18,12 +18,12 @@ XData ProductionDefinition 55559 JDK18 C:\Progra~1\Java\jdk1.8.0_121\ - C:\InterSystems\RabbitMQjava.jar + C:\InterSystems\RabbitMQjava.jar;C:\InterSystems\Java\amqp-client-5.7.2.jar - + EnsLib.JavaGateway.Service hello - C:\InterSystems\RabbitMQjava.jar + C:\InterSystems\RabbitMQjava.jar;C:\InterSystems\Java\amqp-client-5.7.2.jar UTF8 diff --git a/isc/rabbitmq/Service.cls b/isc/rabbitmq/Service.cls new file mode 100644 index 0000000..fbb239b --- /dev/null +++ b/isc/rabbitmq/Service.cls @@ -0,0 +1,14 @@ +Class isc.rabbitmq.Service Extends Ens.BusinessService +{ + +Parameter ADAPTER = "isc.rabbitmq.InboundAdapter"; + +Property Adapter As isc.rabbitmq.InboundAdapter; + +Method OnProcessInput(message As isc.rabbitmq.Message) As %Status +{ + quit message.%Save() +} + +} + diff --git a/RabbitMQ/Utils.cls b/isc/rabbitmq/Utils.cls similarity index 65% rename from RabbitMQ/Utils.cls rename to isc/rabbitmq/Utils.cls index 58f2b7c..4ce1224 100644 --- a/RabbitMQ/Utils.cls +++ b/isc/rabbitmq/Utils.cls @@ -1,33 +1,31 @@ /// Various test methods -Class RabbitMQ.Utils +Class isc.rabbitmq.Utils { Parameter CLASS = "isc.rabbitmq.API"; -Parameter CLASSPATH = "C:\InterSystems\RabbitMQjava.jar"; +Parameter CLASSPATH = "C:\InterSystems\Java\RabbitMQjava.jar,C:\InterSystems\Java\amqp-client-5.7.2.jar"; Parameter BUILDCLASSPATH = "D:\Cache\RabbitMQ\java\out\artifacts\RabbitMQjava_jar\RabbitMQjava.jar"; Parameter GATEWAY = "RabbitMQ"; -Parameter QUEUE = "Test"; +Parameter QUEUE = "hello5"; /// Load Jar from path. -/// Write $System.Status.GetErrorText(##class(RabbitMQ.Utils).UpdateJar()) +/// Write $System.Status.GetErrorText(##class(isc.rabbitmq.Utils).UpdateJar()) ClassMethod UpdateJar(gatewayName = {..#GATEWAY}, path As %String = {..#CLASSPATH}) { #Dim sc As %Status = $$$OK - Set sc = ##class(Ens.Director).StopProduction(, 1) + //Set sc = ##class(Ens.Director).StopProduction(, 1) Quit:$$$ISERR(sc) sc - Set sc = ##class(%Net.Remote.Service).StopGateway(gatewayName) - - Set:$system["ed-pc" sc = ##class(%File).CopyFile(..#BUILDCLASSPATH,..#CLASSPATH, $$$YES, .result) - Quit:sc'=1 $$$ERROR($$$GeneralError, $$$FormatText("File copy failed with error: %1", result)) - + //Set sc = ##class(%Net.Remote.Service).StopGateway(gatewayName) + Quit:$$$ISERR(sc) sc + Set gateway = ..Connect(gatewayName, path, .sc) Quit:$$$ISERR(sc) sc - + Set sc = gateway.%Import(..#CLASS) Quit:$$$ISERR(sc) sc Set:'##class(%Dictionary.CompiledClass).%ExistsId(..#CLASS) sc = $$$ERROR($$$GeneralError, $$$FormatText("Class '%1' does not exist",..#CLASS)) @@ -35,12 +33,12 @@ ClassMethod UpdateJar(gatewayName = {..#GATEWAY}, path As %String = {..#CLASSPAT Set sc = ##class(%Net.Remote.Service).StopGateway(gatewayName) - Set sc = ##class(Ens.Director).StartProduction() + //Set sc = ##class(Ens.Director).StartProduction() Quit sc } /// Read one message. -/// Write $System.Status.GetErrorText(##class(RabbitMQ.Utils).ReadMsg()) +/// Write $System.Status.GetErrorText(##class(isc.rabbitmq.Utils).ReadMsg()) ClassMethod ReadMsg(pMsgLen = 32000) As %Status { #Dim gateway as %Net.Remote.Gateway @@ -60,7 +58,8 @@ ClassMethod ReadMsg(pMsgLen = 32000) As %Status Set stream = api.readMessageStream(.list) set body = stream.Read() Write !,"Body: ", body,! - + set ^a = body + Write ! Zw list @@ -73,7 +72,7 @@ ClassMethod ReadMsg(pMsgLen = 32000) As %Status } /// Read one message. -/// Write $System.Status.GetErrorText(##class(RabbitMQ.Utils).ReadMsgString()) +/// Write $System.Status.GetErrorText(##class(isc.rabbitmq.Utils).ReadMsgString()) ClassMethod ReadMsgString(pMsgLen = 32000) As %Status { #Dim gateway as %Net.Remote.Gateway @@ -89,6 +88,7 @@ ClassMethod ReadMsgString(pMsgLen = 32000) As %Status #Dim list As %ListOfDataTypes Set list = api.readMessageString() + set ^dbg = list.GetAt(16) Write !,"Body: ",list.GetAt(16),! Zw list @@ -102,8 +102,38 @@ ClassMethod ReadMsgString(pMsgLen = 32000) As %Status Quit sc } +/// Write $System.Status.GetErrorText(##class(isc.rabbitmq.Utils).ReadMsgBodyString()) +ClassMethod ReadMsgBodyString() As %Status +{ + #Dim gateway as %Net.Remote.Gateway + #Dim exception as %Exception.AbstractException + + Set sc = $$$OK + Try { + + Set gateway = ..Connect() + #Dim api As isc.rabbitmq.API + Set api = ..GetAPI(gateway) + + Set message = api.readMessageBodyString() + + Write "Body: ",message,! + Write "UTF: ",$zcvt(message, "I", "UTF8"),! + Write "CP1251 UTF: ",$zcvt($zcvt(message, "O", "CP1251"), "I", "UTF8"),! + + Zw list + + Set sc= gateway.%Disconnect() + } Catch ex { + break + Set sc = $$$ADDSC(ex.AsStatus(), $g(%objlasterror)) + } + + Quit sc +} + /// Send one message. -/// Write $System.Status.GetErrorText(##class(RabbitMQ.Utils).SendMsg()) +/// Write $System.Status.GetErrorText(##class(isc.rabbitmq.Utils).SendMsg()) ClassMethod SendMsg(msg = "356") As %Status { #dim gateway as %Net.Remote.Gateway @@ -119,12 +149,13 @@ ClassMethod SendMsg(msg = "356") As %Status Set stream = ##class(%GlobalBinaryStream).%New() Do stream.Write(msg) - Do api.sendMessage(stream, "correlationId", "message " _ $zdt($zts,3,1,3)) + Do api.sendMessageId(stream, "correlationId", "message " _ $zdt($zts,3,1,3)) Set sc= gateway.%Disconnect() } Catch ex { + break Set sc = $$$ADDSC(ex.AsStatus(), $g(%objlasterror)) } @@ -141,11 +172,11 @@ ClassMethod Connect(gatewayName As %String = {..#GATEWAY}, path As %String = {.. Quit gateway } -/// Construct RabbitMQ API object. -ClassMethod GetAPI(gateway As %Net.Remote.Gateway) As isc.rabbitmq.API +/// Construct RabbitMQ API object.As isc.rabbitmq.API +ClassMethod GetAPI(gateway As %Net.Remote.Gateway) { Set host = "localhost" - Set port = -1 + Set port = 5672 Set user = "guest" Set pass = "guest" Set virtualHost = "/" diff --git a/sc-list.txt b/sc-list.txt index d7d43fc..e5f843b 100644 --- a/sc-list.txt +++ b/sc-list.txt @@ -1 +1 @@ -RabbitMQ.pkg +isc.rabbitmq.pkg