Skip to content

Commit 638c5c1

Browse files
committed
Make NIO tests more reliable
1 parent 46263cd commit 638c5c1

File tree

2 files changed

+66
-9
lines changed

2 files changed

+66
-9
lines changed

src/test/java/com/rabbitmq/client/test/JavaNioTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.rabbitmq.client.*;
44
import com.rabbitmq.client.impl.nio.NioParams;
5+
import org.junit.After;
6+
import org.junit.Before;
57
import org.junit.Test;
68

79
import java.io.IOException;
@@ -14,6 +16,25 @@
1416
*/
1517
public class JavaNioTest {
1618

19+
public static final String QUEUE = "nio.queue";
20+
21+
private Connection testConnection;
22+
23+
@Before
24+
public void init() throws Exception {
25+
ConnectionFactory connectionFactory = new ConnectionFactory();
26+
connectionFactory.useNio();
27+
testConnection = connectionFactory.newConnection();
28+
}
29+
30+
@After
31+
public void tearDown() throws Exception {
32+
if (testConnection != null) {
33+
testConnection.createChannel().queueDelete(QUEUE);
34+
testConnection.close();
35+
}
36+
}
37+
1738
@Test
1839
public void connection() throws Exception {
1940
CountDownLatch latch = new CountDownLatch(1);
@@ -101,6 +122,18 @@ public void nioLoopCleaning() throws Exception {
101122
}
102123
}
103124

125+
@Test public void messageSize() throws Exception {
126+
for (int i = 0; i < 50; i++) {
127+
sendAndVerifyMessage(testConnection, 76390);
128+
}
129+
}
130+
131+
private void sendAndVerifyMessage(Connection connection, int size) throws Exception {
132+
CountDownLatch latch = new CountDownLatch(1);
133+
boolean messageReceived = basicGetBasicConsume(connection, QUEUE, latch, size);
134+
assertTrue("Message has not been received", messageReceived);
135+
}
136+
104137
private Connection basicGetBasicConsume(ConnectionFactory connectionFactory, String queue, final CountDownLatch latch)
105138
throws IOException, TimeoutException {
106139
Connection connection = connectionFactory.newConnection();
@@ -121,6 +154,28 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
121154
return connection;
122155
}
123156

157+
private boolean basicGetBasicConsume(Connection connection, String queue, final CountDownLatch latch, int msgSize)
158+
throws Exception {
159+
Channel channel = connection.createChannel();
160+
channel.queueDeclare(queue, false, false, false, null);
161+
channel.queuePurge(queue);
162+
163+
channel.basicPublish("", queue, null, new byte[msgSize]);
164+
165+
final String tag = channel.basicConsume(queue, false, new DefaultConsumer(channel) {
166+
167+
@Override
168+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
169+
getChannel().basicAck(envelope.getDeliveryTag(), false);
170+
latch.countDown();
171+
}
172+
});
173+
174+
boolean done = latch.await(20, TimeUnit.SECONDS);
175+
channel.basicCancel(tag);
176+
return done;
177+
}
178+
124179
private void safeClose(Connection connection) {
125180
if (connection != null) {
126181
try {

src/test/java/com/rabbitmq/client/test/ssl/NioTlsUnverifiedConnection.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ protected void releaseResources() throws IOException {
7171
@Test
7272
public void connectionGetConsume() throws Exception {
7373
CountDownLatch latch = new CountDownLatch(1);
74-
connection = basicGetBasicConsume(connection, QUEUE, latch, 100 * 1000);
74+
basicGetBasicConsume(connection, QUEUE, latch, 100 * 1000);
7575
boolean messagesReceived = latch.await(5, TimeUnit.SECONDS);
7676
assertTrue("Message has not been received", messagesReceived);
7777
}
@@ -120,30 +120,32 @@ public void configure(SSLEngine sslEngine) throws IOException {
120120

121121
private void sendAndVerifyMessage(int size) throws Exception {
122122
CountDownLatch latch = new CountDownLatch(1);
123-
connection = basicGetBasicConsume(connection, QUEUE, latch, size);
124-
boolean messagesReceived = latch.await(20, TimeUnit.SECONDS);
125-
assertTrue("Message has not been received", messagesReceived);
123+
boolean messageReceived = basicGetBasicConsume(connection, QUEUE, latch, size);
124+
assertTrue("Message has not been received", messageReceived);
126125
}
127126

128-
private Connection basicGetBasicConsume(Connection connection, String queue, final CountDownLatch latch, int msgSize)
129-
throws IOException, TimeoutException {
127+
private boolean basicGetBasicConsume(Connection connection, String queue, final CountDownLatch latch, int msgSize)
128+
throws Exception {
130129
Channel channel = connection.createChannel();
131130
channel.queueDeclare(queue, false, false, false, null);
132131
channel.queuePurge(queue);
133132

134133
channel.basicPublish("", queue, null, new byte[msgSize]);
135134

136-
channel.basicConsume(queue, false, new DefaultConsumer(channel) {
135+
String tag = channel.basicConsume(queue, false, new DefaultConsumer(channel) {
137136

138137
@Override
139138
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
140139
getChannel().basicAck(envelope.getDeliveryTag(), false);
141140
latch.countDown();
142-
getChannel().basicCancel(consumerTag);
143141
}
144142
});
145143

146-
return connection;
144+
boolean messageReceived = latch.await(20, TimeUnit.SECONDS);
145+
146+
channel.basicCancel(tag);
147+
148+
return messageReceived;
147149
}
148150

149151
}

0 commit comments

Comments
 (0)