Skip to content

Commit 46a7d86

Browse files
Andreas Presthammeracogoluegnes
Andreas Presthammer
authored andcommitted
Add RpcClient doCall/responseCall/primitiveCall overloads which include timeout
There will be use cases where a single shared timeout specified in the constructor of the RpcClient will not match up with the expected timeout for different rpc calls made using the rpc client. It's then convenient to be able to specifiy the timeout value on a per method call basis. Otherwise the user of RpcClient would have to create multiple instances of RpcClient, incurring the startup cost and having redundant consumers of the response queue.
1 parent 900c76f commit 46a7d86

File tree

1 file changed

+30
-3
lines changed

1 file changed

+30
-3
lines changed

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ public void publish(AMQP.BasicProperties props, byte[] message)
209209
}
210210

211211
public Response doCall(AMQP.BasicProperties props, byte[] message)
212+
throws IOException, TimeoutException {
213+
return doCall(props, message, _timeout);
214+
}
215+
216+
public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
212217
throws IOException, ShutdownSignalException, TimeoutException {
213218
checkConsumer();
214219
BlockingCell<Object> k = new BlockingCell<Object>();
@@ -220,7 +225,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message)
220225
_continuationMap.put(replyId, k);
221226
}
222227
publish(props, message);
223-
Object reply = k.uninterruptibleGet(_timeout);
228+
Object reply = k.uninterruptibleGet(timeout);
224229
if (reply instanceof ShutdownSignalException) {
225230
ShutdownSignalException sig = (ShutdownSignalException) reply;
226231
ShutdownSignalException wrapper =
@@ -238,7 +243,13 @@ public Response doCall(AMQP.BasicProperties props, byte[] message)
238243
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
239244
throws IOException, ShutdownSignalException, TimeoutException
240245
{
241-
return doCall(props, message).getBody();
246+
return primitiveCall(props, message, _timeout);
247+
}
248+
249+
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message, int timeout)
250+
throws IOException, ShutdownSignalException, TimeoutException
251+
{
252+
return doCall(props, message, timeout).getBody();
242253
}
243254

244255
/**
@@ -266,7 +277,23 @@ public byte[] primitiveCall(byte[] message)
266277
* @throws TimeoutException if a response is not received within the configured timeout
267278
*/
268279
public Response responseCall(byte[] message) throws IOException, ShutdownSignalException, TimeoutException {
269-
return doCall(null, message);
280+
return responseCall(message, _timeout);
281+
}
282+
283+
/**
284+
* Perform a simple byte-array-based RPC roundtrip
285+
*
286+
* Useful if you need to get at more than just the body of the message
287+
*
288+
* @param message the byte array request message to send
289+
* @param timeout milliseconds before timing out on wait for response
290+
* @return The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
291+
* @throws ShutdownSignalException if the connection dies during our wait
292+
* @throws IOException if an error is encountered
293+
* @throws TimeoutException if a response is not received within the configured timeout
294+
*/
295+
public Response responseCall(byte[] message, int timeout) throws IOException, ShutdownSignalException, TimeoutException {
296+
return doCall(null, message, timeout);
270297
}
271298

272299
/**

0 commit comments

Comments
 (0)