Skip to content

Commit 5852d8e

Browse files
author
Michael Bridgen
committed
Merge in bug22083
2 parents 8f0db4a + 28eeec7 commit 5852d8e

File tree

3 files changed

+89
-8
lines changed

3 files changed

+89
-8
lines changed

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -523,9 +523,13 @@ public boolean processControlCommand(Command c)
523523
return false;
524524
} else {
525525
// Quiescing.
526-
if (method instanceof AMQP.Connection.CloseOk) {
527-
// It's our final "RPC".
528-
return false;
526+
if (method instanceof AMQP.Connection.CloseOk) {
527+
// It's our final "RPC". Time to shut down.
528+
_running = false;
529+
// If Close was sent from within the MainLoop we
530+
// will not have a continuation to return to, so
531+
// we treat this as processed in that case.
532+
return _channel0._activeRpc == null;
529533
} else {
530534
// Ignore all others.
531535
return true;
@@ -680,14 +684,21 @@ public void close(int closeCode,
680684
boolean abort)
681685
throws IOException
682686
{
687+
final boolean sync = !(Thread.currentThread() instanceof MainLoop);
688+
683689
try {
684690
AMQImpl.Connection.Close reason =
685691
new AMQImpl.Connection.Close(closeCode, closeMessage, 0, 0);
692+
686693
shutdown(reason, initiatedByApplication, cause, true);
687-
AMQChannel.SimpleBlockingRpcContinuation k =
688-
new AMQChannel.SimpleBlockingRpcContinuation();
689-
_channel0.quiescingRpc(reason, k);
690-
k.getReply(timeout);
694+
if(sync){
695+
AMQChannel.SimpleBlockingRpcContinuation k =
696+
new AMQChannel.SimpleBlockingRpcContinuation();
697+
_channel0.quiescingRpc(reason, k);
698+
k.getReply(timeout);
699+
} else {
700+
_channel0.quiescingTransmit(reason);
701+
}
691702
} catch (TimeoutException tte) {
692703
if (!abort)
693704
throw new ShutdownSignalException(true, true, tte, this);
@@ -698,7 +709,7 @@ public void close(int closeCode,
698709
if (!abort)
699710
throw ioe;
700711
} finally {
701-
_frameHandler.close();
712+
if(sync) _frameHandler.close();
702713
}
703714
}
704715

test/src/com/rabbitmq/client/test/ClientTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public static TestSuite suite() {
4545
suite.addTest(BrokenFramesTest.suite());
4646
suite.addTest(ClonePropertiesTest.suite());
4747
suite.addTestSuite(Bug20004Test.class);
48+
suite.addTestSuite(CloseInMainLoop.class);
4849
return suite;
4950
}
5051
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.rabbitmq.client.test;
2+
3+
import com.rabbitmq.client.impl.*;
4+
import com.rabbitmq.client.*;
5+
import java.io.IOException;
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
import java.util.concurrent.CountDownLatch;
8+
9+
import javax.net.SocketFactory;
10+
11+
public class CloseInMainLoop extends BrokerTestCase{
12+
class SpecialConnection extends AMQConnection{
13+
private AtomicBoolean validShutdown = new AtomicBoolean();
14+
15+
public boolean hadValidShutdown(){
16+
if(isOpen()) throw new IllegalStateException("hadValidShutdown called while connection is still open");
17+
return validShutdown.get();
18+
}
19+
20+
public SpecialConnection() throws Exception{
21+
super(new ConnectionParameters(), new SocketFrameHandler(SocketFactory.getDefault().createSocket("localhost", 5672)));
22+
this.start(true);
23+
}
24+
25+
@Override
26+
public boolean processControlCommand(Command c) throws IOException{
27+
if(c.getMethod() instanceof AMQP.Connection.CloseOk) validShutdown.set(true);
28+
return super.processControlCommand(c);
29+
}
30+
}
31+
32+
33+
public void testCloseOKNormallyReceived() throws Exception{
34+
SpecialConnection connection = new SpecialConnection();
35+
connection.close();
36+
assertTrue(connection.hadValidShutdown());
37+
}
38+
39+
// The thrown runtime exception should get intercepted by the
40+
// consumer exception handler, and result in a clean shut down.
41+
public void testCloseWithFaultyConsumer() throws Exception{
42+
SpecialConnection connection = new SpecialConnection();
43+
Channel channel = connection.createChannel();
44+
channel.exchangeDeclare("x", "direct");
45+
channel.queueDeclare("q");
46+
channel.queueDelete("q");
47+
channel.queueDeclare("q");
48+
channel.queueBind("q", "x", "k");
49+
50+
final CountDownLatch latch = new CountDownLatch(1);
51+
52+
channel.basicConsume("q", true, new DefaultConsumer(channel){
53+
public void handleDelivery(String consumerTag,
54+
Envelope envelope,
55+
AMQP.BasicProperties properties,
56+
byte[] body){
57+
latch.countDown();
58+
throw new RuntimeException("I am a bad consumer");
59+
}
60+
});
61+
62+
channel.basicPublish("x", "k", null, new byte[10]);
63+
64+
latch.await();
65+
Thread.sleep(200);
66+
assertTrue(connection.hadValidShutdown());
67+
}
68+
69+
}

0 commit comments

Comments
 (0)