Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: objectscript/RabbitMQ-Ensemble-javaapi
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.3.1
Choose a base ref
...
head repository: objectscript/RabbitMQ-Ensemble-javaapi
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
  • 7 commits
  • 1 file changed
  • 1 contributor

Commits on Apr 13, 2018

  1. Catch null exchange value

    eduard93 committed Apr 13, 2018
    Copy the full SHA
    cd27027 View commit details

Commits on Sep 11, 2018

  1. Update API.java

    eduard93 authored Sep 11, 2018
    Copy the full SHA
    d8628ae View commit details

Commits on Sep 18, 2018

  1. Allow Content-Type

    eduard93 committed Sep 18, 2018
    Copy the full SHA
    6dd432b View commit details

Commits on Sep 19, 2018

  1. Always allow Content-Type

    eduard93 committed Sep 19, 2018
    Copy the full SHA
    8abe915 View commit details

Commits on Nov 14, 2018

  1. Added isOpen method

    eduard93 committed Nov 14, 2018
    Copy the full SHA
    a086d29 View commit details

Commits on Sep 7, 2019

  1. Delivery mode - persistent

    sendMessage split into sendMessage and sendMessageId
    sendMessageToQueue split into sendMessageToQueue and sendMessageToQueueId
    
    Because of an overload bug in 2019.1.0
    eduard93 committed Sep 7, 2019
    Copy the full SHA
    bd1ba37 View commit details

Commits on Dec 9, 2020

  1. Safer close, heartbeat

    eduard93 committed Dec 9, 2020
    Copy the full SHA
    5921dee View commit details
Showing with 72 additions and 19 deletions.
  1. +72 −19 src/isc/rabbitmq/API.java
91 changes: 72 additions & 19 deletions src/isc/rabbitmq/API.java
Original file line number Diff line number Diff line change
@@ -12,34 +12,50 @@
* Created by eduard on 06.10.2017.
*/
public class API {
private Channel _channel;
private com.rabbitmq.client.Channel _channel;

private final String _queue;

private final String _exchange;

private final Connection _connection;

public String ContentType;

public API(String host, int port, String user, String pass, String virtualHost, String queue, int durable) throws Exception {
this(host, port, user, pass, virtualHost, queue, durable, "");
}

public API(String host, int port, String user, String pass, String virtualHost, String queue, int durable, String exchange) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(user);
factory.setPassword(pass);
factory.setVirtualHost(virtualHost);

if (host.toLowerCase().startsWith("amqp://")) {
// we got URI connection string
factory.setUri(host);
} else{
factory.setHost(host);
factory.setPort(port);
factory.setUsername(user);
factory.setPassword(pass);
factory.setVirtualHost(virtualHost);
}

//factory.setAutomaticRecoveryEnabled(true);
factory.setRequestedHeartbeat(0);


_connection = factory.newConnection();

_channel = _connection.createChannel();
try {
// Check that queue exists
// Method throws exception if queue does not exist or is exclusive
// Correct exception text: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'queue'
com.rabbitmq.client.AMQP.Queue.DeclareOk declareOk = _channel.queueDeclarePassive(queue);
// Do we need to declare queue?
// No if we're sending by exchange/routing_key
if (exchange != null && !exchange.isEmpty()) {
// Check that queue exists
// Method throws exception if queue does not exist or is exclusive
// Correct exception text: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'queue'
AMQP.Queue.DeclareOk declareOk = _channel.queueDeclarePassive(queue);
}
} catch (java.io.IOException ex) {
// Exception closes the channel.
// So we need to create new one.
@@ -58,23 +74,49 @@ public API(String host, int port, String user, String pass, String virtualHost,

}

if (exchange != null) {
_exchange = exchange;
try {
AMQP.Exchange.DeclareOk declareOk = _channel.exchangeDeclarePassive(exchange);
} catch (java.io.IOException ex) {
// Exception closes the channel.
// So we need to create new one.
// _channel.basicRecover() doesn't do the trick
_channel = _connection.createChannel();

Boolean durableBool = (durable != 0);
Boolean autoDelete = false;
Boolean passive = false;
// exchange - name of the exchange
// type - direct, topic, fanout, headers. See https://lostechies.com/derekgreer/2012/03/28/rabbitmq-for-windows-exchange-types/
// passive - if true, works the same as exchangeDeclarePassive
// durable - true if we are declaring a durable exchange (the exchange will survive a server restart)
// autoDelete - true if we are declaring an autodelete exchange (server will delete it when no longer in use)
// arguments - other properties (construction arguments) for the exchange

AMQP.Exchange.DeclareOk declareOk = _channel.exchangeDeclare(exchange, "direct", passive, durableBool, autoDelete, null); // , exclusive, autoDelete, null
}
} else {
_exchange = "";
}

_queue = queue;
_exchange = exchange;
//_exchange = exchange != null ? exchange : "";
}

public void sendMessage(byte[] msg, String correlationId, String messageId) throws Exception {
sendMessageToQueue(_queue, msg, correlationId, messageId);
public void sendMessageId(byte[] msg, String correlationId, String messageId) throws Exception {
sendMessageToQueueId(_queue, msg, correlationId, messageId);
}

public void sendMessage(byte[] msg) throws Exception {
sendMessageToQueue(_queue, msg);
}

public void sendMessageToQueue(String queue, byte[] msg) throws Exception {
_channel.basicPublish(_exchange, queue, null, msg);
sendMessageToQueueId(queue, msg, null, null);
}

public void sendMessageToQueue(String queue, byte[] msg, String correlationId, String messageId) throws Exception {
public void sendMessageToQueueId(String queue, byte[] msg, String correlationId, String messageId) throws Exception {
AMQP.BasicProperties props = createProperties(correlationId, messageId);
_channel.basicPublish(_exchange, queue, props, msg);
}
@@ -142,17 +184,27 @@ private GetResponse readMessage(String[] msg) throws IOException {

}

public Boolean isOpen()
{
return _connection !=null ? _connection.isOpen() : false;
}

public void close()throws Exception {
_channel.close();
_connection.close();
try {
_channel.close();
} catch ( Exception ex) {}

try {
_connection.close();
} catch ( Exception ex) {}
}

private AMQP.BasicProperties createProperties(String correlationId, String messageId) throws Exception
{
String contentType = null;
String contentType = ContentType;
String contentEncoding = null;
HashMap<String, Object> headers = null;
Integer deliveryMode = null;
Integer deliveryMode = Integer.valueOf(2);
Integer priority = null;
//String correlationId= null;
String replyTo = null;
@@ -163,6 +215,7 @@ private AMQP.BasicProperties createProperties(String correlationId, String messa
String userId= null;
String appId = null;
String clusterId= null;

AMQP.BasicProperties props = new AMQP.BasicProperties(contentType, contentEncoding, headers, deliveryMode, priority, correlationId, replyTo, expiration, messageId, timestamp, type, userId, appId, clusterId);
return props;
}