From a8765d1554cf60fc9929bdf44fa3c61aeb6a5790 Mon Sep 17 00:00:00 2001 From: eduard93 Date: Mon, 9 Oct 2017 22:59:28 +0300 Subject: [PATCH] Initial code --- RabbitMQ/Common.cls | 79 ++++++++++++++++++++++++ RabbitMQ/InboundAdapter.cls | 112 +++++++++++++++++++++++++++++++++++ RabbitMQ/Message.cls | 91 ++++++++++++++++++++++++++++ RabbitMQ/Operation.cls | 16 +++++ RabbitMQ/OutboundAdapter.cls | 79 ++++++++++++++++++++++++ RabbitMQ/Production.cls | 32 ++++++++++ RabbitMQ/Service.cls | 14 +++++ sc-list.txt | 1 + 8 files changed, 424 insertions(+) create mode 100644 RabbitMQ/Common.cls create mode 100644 RabbitMQ/InboundAdapter.cls create mode 100644 RabbitMQ/Message.cls create mode 100644 RabbitMQ/Operation.cls create mode 100644 RabbitMQ/OutboundAdapter.cls create mode 100644 RabbitMQ/Production.cls create mode 100644 RabbitMQ/Service.cls create mode 100644 sc-list.txt diff --git a/RabbitMQ/Common.cls b/RabbitMQ/Common.cls new file mode 100644 index 0000000..3ed7013 --- /dev/null +++ b/RabbitMQ/Common.cls @@ -0,0 +1,79 @@ +Class RabbitMQ.Common Extends %RegisteredObject [ Abstract ] +{ + +/// This is the ID name of the set of credentials values (Username, Password) to be used to access the HTTP server +/// Property Credentials As %String [ InitialExpression = "None" ]; +Property Host As %String [ InitialExpression = "localhost" ]; + +Property Port As %Integer [ InitialExpression = -1 ]; + +Property VirtualHost As %String [ InitialExpression = "/" ]; + +Property Queue As %String; + +/// Config Name of the Java Gateway service controlling the Java Gateway server this Operation will use. +Property JGService As %String; + +/// Gateway connection +Property JGW As %Net.Remote.Gateway; + +/// API object +Property API As isc.rabbitmq.API; + +/// Encoding to convert message body. Leave empty to get/send as is. +Property Encoding As %String; + +/// CLASSPATH containing the files required to be passed as an argument when starting the JVM. +/// The user should typically provide here the files containing the classes used via the Java Gateway. +/// We assume that the user has properly quoted the classpath and supplied the correct separators for the platform +/// in case of multiple files.
+/// See property AdditionalPaths in that class. +Property ClassPath As %String(MAXLEN = 32000); + +/// These are the production settings for this object +Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGService:Basic:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic"; + +/// Connect to running JGW +Method Connect() As %Status +{ + // connect to current namespace, use 2 second timeout + Set sc = $$$OK + Set timeout = 5 + Set classPath = ##class(%ListOfDataTypes).%New() + Do classPath.Insert(..ClassPath) + + // get a connection handle and connect + Set gateway = ##class(%Net.Remote.Gateway).%New() + Set host = ##class(Ens.Director).GetHostSettingValue(..JGService, "Address") + Set port = ##class(Ens.Director).GetHostSettingValue(..JGService, "Port") + Set sc = gateway.%Connect(host, port, $namespace, timeout, classPath) + + If $$$ISOK(sc) { + Set ..JGW = gateway + } + Quit sc +} + +Method ConnectToRabbitMQ() As %Status +{ + Set sc = $$$OK + + If ..%CredentialsObj.Username'="" { + Set user = ..%CredentialsObj.Username + Set pass = ..%CredentialsObj.Password + } Else { + Set user = "guest" + Set pass = "guest" + } + + Try { + Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, ..Port, user, pass, ..VirtualHost, ..Queue) + } Catch ex { + Set sc = $$$ADDSC(ex.AsStatus(),$g(%objlasterror)) + } + + Quit sc +} + +} + diff --git a/RabbitMQ/InboundAdapter.cls b/RabbitMQ/InboundAdapter.cls new file mode 100644 index 0000000..65c31b8 --- /dev/null +++ b/RabbitMQ/InboundAdapter.cls @@ -0,0 +1,112 @@ +Class RabbitMQ.InboundAdapter Extends (Ens.InboundAdapter, RabbitMQ.Common) +{ + +/// Stream class to store message body. Leave empty to use strings. +Property BodyClass As %Dictionary.CacheClassname; + +Parameter SETTINGS = "BodyClass:Basic"; + +/// Establish gateway connectionand init java API +Method OnInit() As %Status +{ + Set sc = $$$OK + Quit:..JGService="" $$$ERROR($$$GeneralError,"Specify JGService setting") + Quit:'##class(Ens.Director).IsItemEnabled(..JGService) $$$ERROR($$$GeneralError, $$$FormatText("Java Gateway Service: '%1' is down",..JGService)) + Set sc = ..Connect() + Quit:$$$ISERR(sc) + Set sc = ..ConnectToRabbitMQ() + Quit sc +} + +/// Close connection +Method OnTearDown() As %Status +{ + Do ..API.close() + Quit $$$OK +} + +/// default InboundAdapter behavior: always call ProcessInput on CallInterval +Method OnTask() As %Status +{ + Set sc = $$$OK + + Set messageCount = 1 + + While messageCount > 0 { + #Dim messageList As %ListOfDataTypes + + If ..BodyClass = "" { + Set messageList = ..API.readMessageString() + } Else { + Set tempStream = ..GetTempStream() + Set messageList = ..API.readMessageStream(.tempStream) + } + + Set messageLength = messageList.GetAt(1) + Set messageCount = messageList.GetAt(2) + + If messageLength>0 { + #Dim message As RabbitMQ.Message + Set message = ..ListToMessage(messageList) + If ..BodyClass = "" { + Set message.Body = ..DecodeMessageBody(messageList.GetAt(16)) + } Else { + Set message.Body = $classmethod(..BodyClass, "%New") + Do message.Body.Write(..DecodeMessageBody(tempStream.Read(messageLength))) + Do message.Body.Rewind() + } + Set sc = ..BusinessHost.ProcessInput(message) + } Else { + CONTINUE + } + Quit:$$$ISERR(sc) + } + Set ..BusinessHost.%WaitForNextCallInterval=1 + Quit sc +} + +ClassMethod ListToMessage(list As %ListOfDataTypes) As RabbitMQ.Message +{ + Set message = ##class(RabbitMQ.Message).%New() + + Set message.ContentType = list.GetAt(3) + Set message.ContentEncoding = list.GetAt(4) + Set message.CorrelationId = list.GetAt(5) + Set message.ReplyTo = list.GetAt(6) + Set message.Expiration = list.GetAt(7) + Set message.MessageId = list.GetAt(8) + Set message.Type = list.GetAt(9) + Set message.UserId = list.GetAt(10) + Set message.AppId = list.GetAt(11) + Set message.ClusterId = list.GetAt(12) + Set message.DeliveryMode = list.GetAt(13) + Set message.Priority = list.GetAt(14) + Set message.Timestamp = list.GetAt(15) + + Quit message +} + +Method DecodeMessageBody(body As %String) As %String +{ + If ..Encoding '= "" { + If $isObject(body) { + // TODO streams + } Else { + Set body = $zcvt(body, "O", ..Encoding) + } + } + Quit body +} + +ClassMethod GetTempStream() As %GlobalBinaryStream +{ + Set stream=##class(%GlobalBinaryStream).%New() + // TODO - work around that + // we need to 'reserve' a number of bytes since we are passing the stream + // by reference (Java's equivalent is byte[] ba = new byte[max];) + For i=1:1:32000 Do stream.Write("0") + Quit stream +} + +} + diff --git a/RabbitMQ/Message.cls b/RabbitMQ/Message.cls new file mode 100644 index 0000000..c02c95f --- /dev/null +++ b/RabbitMQ/Message.cls @@ -0,0 +1,91 @@ +Class RabbitMQ.Message Extends %Persistent +{ + +Property ContentType As %String; + +Property ContentEncoding As %String; + +Property CorrelationId As %String; + +Property ReplyTo As %String; + +Property Expiration As %String; + +Property MessageId As %String; + +Property Type As %String; + +Property UserId As %String; + +Property AppId As %String; + +Property ClusterId As %String; + +Property DeliveryMode As %String; + +Property Priority As %String; + +Property Timestamp As %String; + +/// Could be either string or stream +Property Body As %String; + +Storage Default +{ + + +%%CLASSNAME + + +ContentType + + +ContentEncoding + + +CorrelationId + + +ReplyTo + + +Expiration + + +MessageId + + +Type + + +UserId + + +AppId + + +ClusterId + + +DeliveryMode + + +Priority + + +Timestamp + + +Body + + +^RabbitMQ.MessageD +MessageDefaultData +^RabbitMQ.MessageD +^RabbitMQ.MessageI +^RabbitMQ.MessageS +%Library.CacheStorage +} + +} + diff --git a/RabbitMQ/Operation.cls b/RabbitMQ/Operation.cls new file mode 100644 index 0000000..41135b7 --- /dev/null +++ b/RabbitMQ/Operation.cls @@ -0,0 +1,16 @@ +Class RabbitMQ.Operation Extends Ens.BusinessOperation +{ + +Parameter ADAPTER = "RabbitMQ.OutboundAdapter"; + +Property Adapter As RabbitMQ.OutboundAdapter; + +Method OnMessage(request As Ens.StringRequest, response As Ens.Response) As %Status +{ + #Dim sc As %Status = $$$OK + Set response = ##class(Ens.Response).%New() + quit ..Adapter.SendMessageToQueue("Hello", request.StringValue) +} + +} + diff --git a/RabbitMQ/OutboundAdapter.cls b/RabbitMQ/OutboundAdapter.cls new file mode 100644 index 0000000..2a82be5 --- /dev/null +++ b/RabbitMQ/OutboundAdapter.cls @@ -0,0 +1,79 @@ +Class RabbitMQ.OutboundAdapter Extends (Ens.OutboundAdapter, RabbitMQ.Common) +{ + +Method OnInit() As %Status +{ + Set sc = $$$OK + Quit:..JGService="" $$$ERROR($$$GeneralError,"Specify JGService setting") + Quit:'##class(Ens.Director).IsItemEnabled(..JGService) $$$ERROR($$$GeneralError, $$$FormatText("Java Gateway Service: '%1' is down",..JGService)) + Set sc = ..Connect() + Quit:$$$ISERR(sc) sc + Set sc = ..ConnectToRabbitMQ() + Quit sc +} + +/// Close connection +Method OnTearDown() As %Status +{ + Do ..API.close() + Quit $$$OK +} + +/// Send message. message can be a string or stream. +Method SendMessage(message As %Stream.Object) As %Status +{ + Set sc = $$$OK + Set stream = ##class(%Library.GlobalBinaryStream).%New() + + If $isObject(message) { + While 'message.AtEnd { + Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength))) + } + } Else { + Do stream.Write(..EncodeMessageBody(message)) + } + + Try { + Do ..API.sendMessage(stream) + } Catch ex { + Set sc = ex.AsStatus() + } + Quit sc +} + +/// Send message. message can be a string or stream. +Method SendMessageToQueue(queue As %String, message As %Stream.Object) As %Status +{ + Set sc = $$$OK + Set stream = ##class(%Library.GlobalBinaryStream).%New() + + If $isObject(message) { + While 'message.AtEnd { + Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength))) + } + } Else { + Do stream.Write(..EncodeMessageBody(message)) + } + + Try { + Do ..API.sendMessageToQueue(queue, stream) + } Catch ex { + Set sc = ex.AsStatus() + } + Quit sc +} + +Method EncodeMessageBody(body As %String) As %String +{ + If ..Encoding '= "" { + If $isObject(body) { + // TODO streams + } Else { + Set body = $zcvt(body, "O", ..Encoding) + } + } + Quit body +} + +} + diff --git a/RabbitMQ/Production.cls b/RabbitMQ/Production.cls new file mode 100644 index 0000000..a41d1f8 --- /dev/null +++ b/RabbitMQ/Production.cls @@ -0,0 +1,32 @@ +Class RabbitMQ.Production Extends Ens.Production +{ + +XData ProductionDefinition +{ + + + 2 + + EnsLib.JavaGateway.Service + C:\InterSystems\RabbitMQjava.jar + hello + + + + + 55559 + JDK18 + C:\Progra~1\Java\jdk1.8.0_121\ + C:\InterSystems\RabbitMQjava.jar + + + EnsLib.JavaGateway.Service + hello + C:\InterSystems\RabbitMQjava.jar + UTF8 + + +} + +} + diff --git a/RabbitMQ/Service.cls b/RabbitMQ/Service.cls new file mode 100644 index 0000000..f0e8cb6 --- /dev/null +++ b/RabbitMQ/Service.cls @@ -0,0 +1,14 @@ +Class RabbitMQ.Service Extends Ens.BusinessService +{ + +Parameter ADAPTER = "RabbitMQ.InboundAdapter"; + +Property Adapter As RabbitMQ.InboundAdapter; + +Method OnProcessInput(message As RabbitMQ.Message) As %Status +{ + quit message.%Save() +} + +} + diff --git a/sc-list.txt b/sc-list.txt new file mode 100644 index 0000000..d7d43fc --- /dev/null +++ b/sc-list.txt @@ -0,0 +1 @@ +RabbitMQ.pkg