Skip to content

Commit b365364

Browse files
author
Emile Joubert
committed
Merged stable into default
2 parents 91174f7 + a291913 commit b365364

32 files changed

+1052
-301
lines changed

scripts/runperftest.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
run() {
44
echo "=== running with '$2'"
5-
sh `dirname $0`/runjava.sh com.rabbitmq.examples.MulticastMain -h $1 -z 10 -i 20 $2
5+
sh `dirname $0`/runjava.sh com.rabbitmq.examples.PerfTest -h $1 -z 10 -i 20 $2
66
sleep 2
77
}
88

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is GoPivotal, Inc.
14+
// Copyright (c) 2013-2013 GoPivotal, Inc. All rights reserved.
15+
//
16+
17+
package com.rabbitmq.client;
18+
19+
/**
20+
* Thrown when the broker refuses access due to an authentication failure.
21+
*/
22+
23+
public class AuthenticationFailureException extends PossibleAuthenticationFailureException
24+
{
25+
public AuthenticationFailureException(String reason) {
26+
super(reason);
27+
}
28+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is GoPivotal, Inc.
14+
// Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
15+
//
16+
17+
18+
package com.rabbitmq.client;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* Implement this interface in order to be notified of connection block and
24+
* unblock events.
25+
*/
26+
public interface BlockedListener {
27+
void handleBlocked(String reason) throws IOException;
28+
void handleUnblocked() throws IOException;
29+
}

src/com/rabbitmq/client/Connection.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,4 +214,24 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
214214
* operations, use -1 for infinity
215215
*/
216216
void abort(int closeCode, String closeMessage, int timeout);
217+
218+
/**
219+
* Add a {@link BlockedListener}.
220+
* @param listener the listener to add
221+
*/
222+
void addBlockedListener(BlockedListener listener);
223+
224+
/**
225+
* Remove a {@link BlockedListener}.
226+
* @param listener the listener to remove
227+
* @return <code><b>true</b></code> if the listener was found and removed,
228+
* <code><b>false</b></code> otherwise
229+
*/
230+
boolean removeBlockedListener(BlockedListener listener);
231+
232+
/**
233+
* Remove all {@link BlockedListener}s.
234+
*/
235+
void clearBlockedListeners();
236+
217237
}

src/com/rabbitmq/client/PossibleAuthenticationFailureException.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,9 @@ public PossibleAuthenticationFailureException(Throwable cause)
3131
super("Possibly caused by authentication failure");
3232
super.initCause(cause);
3333
}
34+
35+
public PossibleAuthenticationFailureException(String reason)
36+
{
37+
super(reason);
38+
}
3439
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
import java.net.InetAddress;
2222
import java.net.SocketException;
2323
import java.net.SocketTimeoutException;
24+
import java.util.Collection;
2425
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.Map;
28+
import java.util.concurrent.CopyOnWriteArrayList;
2729
import java.util.concurrent.ExecutorService;
2830
import java.util.concurrent.TimeoutException;
2931

3032
import com.rabbitmq.client.AMQP;
33+
import com.rabbitmq.client.AuthenticationFailureException;
34+
import com.rabbitmq.client.BlockedListener;
3135
import com.rabbitmq.client.Method;
3236
import com.rabbitmq.client.AlreadyClosedException;
3337
import com.rabbitmq.client.Channel;
@@ -81,6 +85,8 @@ public static final Map<String, Object> defaultClientProperties() {
8185
capabilities.put("exchange_exchange_bindings", true);
8286
capabilities.put("basic.nack", true);
8387
capabilities.put("consumer_cancel_notify", true);
88+
capabilities.put("connection.blocked", true);
89+
capabilities.put("authentication_failure_close", true);
8490

8591
props.put("capabilities", capabilities);
8692

@@ -130,6 +136,7 @@ public static final Map<String, Object> defaultClientProperties() {
130136
private final int requestedFrameMax;
131137
private final String username;
132138
private final String password;
139+
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
133140

134141
/* State modified after start - all volatile */
135142

@@ -274,9 +281,10 @@ public AMQConnection(String username,
274281
* either before, or during, protocol negotiation;
275282
* sub-classes {@link ProtocolVersionMismatchException} and
276283
* {@link PossibleAuthenticationFailureException} will be thrown in the
277-
* corresponding circumstances. If an exception is thrown, connection
278-
* resources allocated can all be garbage collected when the connection
279-
* object is no longer referenced.
284+
* corresponding circumstances. {@link AuthenticationFailureException}
285+
* will be thrown if the broker closes the connection with ACCESS_REFUSED.
286+
* If an exception is thrown, connection resources allocated can all be
287+
* garbage collected when the connection object is no longer referenced.
280288
*/
281289
public void start()
282290
throws IOException
@@ -352,6 +360,16 @@ public void start()
352360
response = sm.handleChallenge(challenge, this.username, this.password);
353361
}
354362
} catch (ShutdownSignalException e) {
363+
Object shutdownReason = e.getReason();
364+
if (shutdownReason instanceof AMQCommand) {
365+
Method shutdownMethod = ((AMQCommand) shutdownReason).getMethod();
366+
if (shutdownMethod instanceof AMQP.Connection.Close) {
367+
AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
368+
if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
369+
throw new AuthenticationFailureException(shutdownClose.getReplyText());
370+
}
371+
}
372+
}
355373
throw new PossibleAuthenticationFailureException(e);
356374
}
357375
} while (connTune == null);
@@ -597,6 +615,25 @@ public boolean processControlCommand(Command c) throws IOException
597615
if (method instanceof AMQP.Connection.Close) {
598616
handleConnectionClose(c);
599617
return true;
618+
} else if (method instanceof AMQP.Connection.Blocked) {
619+
AMQP.Connection.Blocked blocked = (AMQP.Connection.Blocked) method;
620+
try {
621+
for (BlockedListener l : this.blockedListeners) {
622+
l.handleBlocked(blocked.getReason());
623+
}
624+
} catch (Throwable ex) {
625+
getExceptionHandler().handleBlockedListenerException(this, ex);
626+
}
627+
return true;
628+
} else if (method instanceof AMQP.Connection.Unblocked) {
629+
try {
630+
for (BlockedListener l : this.blockedListeners) {
631+
l.handleUnblocked();
632+
}
633+
} catch (Throwable ex) {
634+
getExceptionHandler().handleBlockedListenerException(this, ex);
635+
}
636+
return true;
600637
} else {
601638
return false;
602639
}
@@ -621,7 +658,7 @@ public boolean processControlCommand(Command c) throws IOException
621658
}
622659

623660
public void handleConnectionClose(Command closeCommand) {
624-
ShutdownSignalException sse = shutdown(closeCommand, false, null, false);
661+
ShutdownSignalException sse = shutdown(closeCommand, false, null, _inConnectionNegotiation);
625662
try {
626663
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
627664
} catch (IOException _) { } // ignore
@@ -823,4 +860,16 @@ public AMQCommand transformReply(AMQCommand command) {
823860
private String getHostAddress() {
824861
return getAddress() == null ? null : getAddress().getHostAddress();
825862
}
863+
864+
public void addBlockedListener(BlockedListener listener) {
865+
blockedListeners.add(listener);
866+
}
867+
868+
public boolean removeBlockedListener(BlockedListener listener) {
869+
return blockedListeners.remove(listener);
870+
}
871+
872+
public void clearBlockedListeners() {
873+
blockedListeners.clear();
874+
}
826875
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ private void releaseChannel() {
394394
}
395395
if (callback != null) {
396396
try {
397-
callback.handleCancel(consumerTag);
397+
this.dispatcher.handleCancel(callback, consumerTag);
398398
} catch (Throwable ex) {
399399
getConnection().getExceptionHandler().handleConsumerException(this,
400400
ex,

src/com/rabbitmq/client/impl/ConsumerDispatcher.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,25 @@ public void run() {
100100
});
101101
}
102102

103+
public void handleCancel(final Consumer delegate, final String consumerTag) {
104+
executeUnlessShuttingDown(
105+
new Runnable() {
106+
public void run() {
107+
try {
108+
delegate.handleCancel(consumerTag);
109+
} catch (Throwable ex) {
110+
connection.getExceptionHandler().handleConsumerException(
111+
channel,
112+
ex,
113+
delegate,
114+
consumerTag,
115+
"handleCancel");
116+
}
117+
}
118+
});
119+
}
120+
121+
103122
public void handleRecoverOk(final Consumer delegate, final String consumerTag) {
104123
executeUnlessShuttingDown(
105124
new Runnable() {

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ public void handleConfirmListenerException(Channel channel, Throwable exception)
4949
handleChannelKiller(channel, exception, "ConfirmListener.handle{N,A}ck");
5050
}
5151

52+
public void handleBlockedListenerException(Connection connection, Throwable exception) {
53+
handleConnectionKiller(connection, exception, "BlockedListener");
54+
}
55+
5256
public void handleConsumerException(Channel channel, Throwable exception,
5357
Consumer consumer, String consumerTag,
5458
String methodName)
@@ -76,4 +80,22 @@ protected void handleChannelKiller(Channel channel, Throwable exception, String
7680
channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + what);
7781
}
7882
}
83+
84+
protected void handleConnectionKiller(Connection connection, Throwable exception, String what) {
85+
// TODO: log the exception
86+
System.err.println("DefaultExceptionHandler: " + what + " threw an exception for connection "
87+
+ connection + ":");
88+
exception.printStackTrace();
89+
try {
90+
connection.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what);
91+
} catch (AlreadyClosedException ace) {
92+
// noop
93+
} catch (IOException ioe) {
94+
// TODO: log the failure
95+
System.err.println("Failure during close of connection " + connection + " after " + exception
96+
+ ":");
97+
ioe.printStackTrace();
98+
connection.abort(AMQP.INTERNAL_ERROR, "Internal error closing connection for " + what);
99+
}
100+
}
79101
}

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ public interface ExceptionHandler {
6363
*/
6464
void handleConfirmListenerException(Channel channel, Throwable exception);
6565

66+
/**
67+
* Perform any required exception processing for the situation
68+
* when the driver thread for the connection has called a
69+
* BlockedListener's method, and that method has
70+
* thrown an exception.
71+
* @param connection the Connection that held the BlockedListener
72+
* @param exception the exception thrown by the BlockedListener
73+
*/
74+
void handleBlockedListenerException(Connection connection, Throwable exception);
75+
6676
/**
6777
* Perform any required exception processing for the situation
6878
* when the driver thread for the connection has called a method

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ public void handleConfirmListenerException(Channel ch, Throwable ex) {
242242
fail("handleConfirmListenerException: " + ex);
243243
}
244244

245+
public void handleBlockedListenerException(Connection conn, Throwable ex) {
246+
fail("handleBlockedListenerException: " + ex);
247+
}
248+
245249
public void handleConsumerException(Channel ch,
246250
Throwable ex,
247251
Consumer c,

test/src/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,4 +248,17 @@ protected void deleteExchange(String x) throws IOException {
248248
protected void deleteQueue(String q) throws IOException {
249249
channel.queueDelete(q);
250250
}
251+
252+
protected void clearAllResourceAlarms() throws IOException, InterruptedException {
253+
clearResourceAlarm("memory");
254+
clearResourceAlarm("disk");
255+
}
256+
257+
protected void setResourceAlarm(String source) throws IOException, InterruptedException {
258+
Host.executeCommand("cd ../rabbitmq-test; make set-resource-alarm SOURCE=" + source);
259+
}
260+
261+
protected void clearResourceAlarm(String source) throws IOException, InterruptedException {
262+
Host.executeCommand("cd ../rabbitmq-test; make clear-resource-alarm SOURCE=" + source);
263+
}
251264
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AlreadyClosedException;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.test.BrokerTestCase;
6+
7+
import java.io.IOException;
8+
9+
public class BasicGet extends BrokerTestCase {
10+
public void testBasicGetWithEnqueuedMessages() throws IOException, InterruptedException {
11+
assertTrue(channel.isOpen());
12+
String q = channel.queueDeclare().getQueue();
13+
14+
basicPublishPersistent("msg".getBytes("UTF-8"), q);
15+
Thread.sleep(250);
16+
17+
assertNotNull(channel.basicGet(q, true));
18+
channel.queuePurge(q);
19+
assertNull(channel.basicGet(q, true));
20+
channel.queueDelete(q);
21+
}
22+
23+
public void testBasicGetWithEmptyQueue() throws IOException, InterruptedException {
24+
assertTrue(channel.isOpen());
25+
String q = channel.queueDeclare().getQueue();
26+
27+
assertNull(channel.basicGet(q, true));
28+
channel.queueDelete(q);
29+
}
30+
31+
public void testBasicGetWithClosedChannel() throws IOException, InterruptedException {
32+
assertTrue(channel.isOpen());
33+
String q = channel.queueDeclare().getQueue();
34+
35+
channel.close();
36+
assertFalse(channel.isOpen());
37+
try {
38+
channel.basicGet(q, true);
39+
fail("expected basic.get on a closed channel to fail");
40+
} catch (AlreadyClosedException e) {
41+
// passed
42+
} finally {
43+
Channel tch = connection.createChannel();
44+
tch.queueDelete(q);
45+
tch.close();
46+
}
47+
48+
}
49+
}

0 commit comments

Comments
 (0)