From 8f55d09f6ee78afa01b113d48e0e8e56e206ce2f Mon Sep 17 00:00:00 2001 From: eduard93 Date: Thu, 11 Jan 2018 19:50:53 +0300 Subject: [PATCH] Added optional message properties: correlationId, messageId For https://github.com/intersystems-ru/RabbitMQ-Ensemble-javaapi/issues/1 --- src/isc/rabbitmq/API.java | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/isc/rabbitmq/API.java b/src/isc/rabbitmq/API.java index e57eb02..de57d04 100644 --- a/src/isc/rabbitmq/API.java +++ b/src/isc/rabbitmq/API.java @@ -5,6 +5,10 @@ import java.io.IOException; import java.nio.file.*; + +import java.util.Date; +import java.util.HashMap; + /** * Created by eduard on 06.10.2017. */ @@ -30,6 +34,10 @@ public API(String host, int port, String user, String pass, String virtualHost, _queue = queue; } + public void sendMessage(byte[] msg, String correlationId, String messageId) throws Exception { + sendMessageToQueue(_queue, msg, correlationId, messageId); + } + public void sendMessage(byte[] msg) throws Exception { sendMessageToQueue(_queue, msg); } @@ -39,6 +47,11 @@ public void sendMessageToQueue(String queue, byte[] msg) throws Exception { _channel.basicPublish("", queue, null, msg); } + public void sendMessageToQueue(String queue, byte[] msg, String correlationId, String messageId) throws Exception { + AMQP.BasicProperties props = createProperties(correlationId, messageId); + _channel.basicPublish("", queue, props, msg); + } + public byte[] readMessageStream(String[] result) throws Exception { GetResponse response = readMessage(result); if (response == null) { @@ -102,4 +115,24 @@ public void close()throws Exception { _connection.close(); } + private AMQP.BasicProperties createProperties(String correlationId, String messageId) throws Exception + { + String contentType = null; + String contentEncoding = null; + HashMap headers = null; + Integer deliveryMode = null; + Integer priority = null; + //String correlationId= null; + String replyTo = null; + String expiration= null; + //String messageId= null; + Date timestamp= null; + String type = null; + String userId= null; + String appId = null; + String clusterId= null; + AMQP.BasicProperties props = new AMQP.BasicProperties(contentType, contentEncoding, headers, deliveryMode, priority, correlationId, replyTo, expiration, messageId, timestamp, type, userId, appId, clusterId); + return props; + } + }