Skip to content

Commit 76fbe60

Browse files
Merge pull request #331 from rabbitmq/rabbitmq-java-client-319
Get rid of read retry in NIO
2 parents 34cce9b + e4df45f commit 76fbe60

13 files changed

+475
-265
lines changed

src/main/java/com/rabbitmq/client/impl/nio/ByteBufferInputStream.java

Lines changed: 0 additions & 57 deletions
This file was deleted.
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client.impl.nio;
17+
18+
import com.rabbitmq.client.AMQP;
19+
import com.rabbitmq.client.MalformedFrameException;
20+
import com.rabbitmq.client.impl.Frame;
21+
22+
import java.io.DataInputStream;
23+
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
import java.nio.channels.ReadableByteChannel;
26+
27+
/**
28+
* Class to create AMQP frames from a {@link ReadableByteChannel}.
29+
* Supports partial frames: a frame can be read in several attempts
30+
* from the {@link NioLoop}. This can happen when the channel won't
31+
* read any more bytes in the middle of a frame building. The state
32+
* of the outstanding frame is saved up, and the builder will
33+
* start where it left off when the {@link NioLoop} comes back to
34+
* this connection.
35+
* This class is not thread safe.
36+
*
37+
* @since 4.4.0
38+
*/
39+
public class FrameBuilder {
40+
41+
private static final int PAYLOAD_OFFSET = 1 /* type */ + 2 /* channel */ + 4 /* payload size */;
42+
43+
protected final ReadableByteChannel channel;
44+
45+
protected final ByteBuffer applicationBuffer;
46+
// to store the bytes of the outstanding data
47+
// 3 byte-long because the longest we read is an unsigned int
48+
// (not need to store the latest byte)
49+
private final int[] frameBuffer = new int[3];
50+
private int frameType;
51+
private int frameChannel;
52+
private byte[] framePayload;
53+
private int bytesRead = 0;
54+
55+
public FrameBuilder(ReadableByteChannel channel, ByteBuffer buffer) {
56+
this.channel = channel;
57+
this.applicationBuffer = buffer;
58+
}
59+
60+
/**
61+
* Read a frame from the network.
62+
* This method returns null f a frame could not have been fully built from
63+
* the network. The client must then retry later (typically
64+
* when the channel notifies it has something to read).
65+
*
66+
* @return a complete frame or null if a frame couldn't have been fully built
67+
* @throws IOException
68+
* @see Frame#readFrom(DataInputStream)
69+
*/
70+
public Frame readFrame() throws IOException {
71+
while (somethingToRead()) {
72+
if (bytesRead == 0) {
73+
// type
74+
frameType = readFromBuffer();
75+
if (frameType == 'A') {
76+
handleProtocolVersionMismatch();
77+
}
78+
} else if (bytesRead == 1) {
79+
// channel 1/2
80+
frameBuffer[0] = readFromBuffer();
81+
} else if (bytesRead == 2) {
82+
// channel 2/2
83+
frameChannel = (frameBuffer[0] << 8) + (readFromBuffer() << 0);
84+
} else if (bytesRead == 3) {
85+
// payload size 1/4
86+
frameBuffer[0] = readFromBuffer();
87+
} else if (bytesRead == 4) {
88+
// payload size 2/4
89+
frameBuffer[1] = readFromBuffer();
90+
} else if (bytesRead == 5) {
91+
// payload size 3/4
92+
frameBuffer[2] = readFromBuffer();
93+
} else if (bytesRead == 6) {
94+
// payload size 4/4
95+
int framePayloadSize = ((frameBuffer[0] << 24) + (frameBuffer[1] << 16) + (frameBuffer[2] << 8) + (readFromBuffer() << 0));
96+
framePayload = new byte[framePayloadSize];
97+
} else if (bytesRead >= PAYLOAD_OFFSET && bytesRead < framePayload.length + PAYLOAD_OFFSET) {
98+
framePayload[bytesRead - PAYLOAD_OFFSET] = (byte) readFromBuffer();
99+
} else if (bytesRead == framePayload.length + PAYLOAD_OFFSET) {
100+
int frameEndMarker = readFromBuffer();
101+
if (frameEndMarker != AMQP.FRAME_END) {
102+
throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
103+
}
104+
bytesRead = 0;
105+
return new Frame(frameType, frameChannel, framePayload);
106+
} else {
107+
throw new IllegalStateException("Number of read bytes incorrect: " + bytesRead);
108+
}
109+
bytesRead++;
110+
}
111+
return null;
112+
}
113+
114+
/**
115+
* Tells whether there's something to read in the application buffer or not.
116+
* Tries to read from the network if necessary.
117+
*
118+
* @return true if there's something to read in the application buffer
119+
* @throws IOException
120+
*/
121+
protected boolean somethingToRead() throws IOException {
122+
if (!applicationBuffer.hasRemaining()) {
123+
applicationBuffer.clear();
124+
int read = NioHelper.read(channel, applicationBuffer);
125+
applicationBuffer.flip();
126+
if (read > 0) {
127+
return true;
128+
} else {
129+
return false;
130+
}
131+
} else {
132+
return true;
133+
}
134+
}
135+
136+
private int readFromBuffer() {
137+
return applicationBuffer.get() & 0xff;
138+
}
139+
140+
/**
141+
* Handle a protocol version mismatch.
142+
* @return
143+
* @throws IOException
144+
* @see Frame#protocolVersionMismatch(DataInputStream)
145+
*/
146+
private void handleProtocolVersionMismatch() throws IOException {
147+
// Probably an AMQP.... header indicating a version mismatch
148+
// Otherwise meaningless, so try to read the version,
149+
// and throw an exception, whether we read the version
150+
// okay or not.
151+
// Try to read everything from the network, this header
152+
// is small and should never require several network reads.
153+
byte[] expectedBytes = new byte[] { 'M', 'Q', 'P' };
154+
int expectedBytesCount = 0;
155+
while (somethingToRead() && expectedBytesCount < 3) {
156+
// We expect the letters M, Q, P in that order: generate an informative error if they're not found
157+
int nextByte = readFromBuffer();
158+
if (nextByte != expectedBytes[expectedBytesCount]) {
159+
throw new MalformedFrameException("Invalid AMQP protocol header from server: expected character " +
160+
expectedBytes[expectedBytesCount] + ", got " + nextByte);
161+
}
162+
expectedBytesCount++;
163+
}
164+
165+
if (expectedBytesCount != 3) {
166+
throw new MalformedFrameException("Invalid AMQP protocol header from server: read only "
167+
+ (expectedBytesCount + 1) + " byte(s) instead of 4");
168+
}
169+
170+
int[] signature = new int[4];
171+
172+
for (int i = 0; i < 4; i++) {
173+
if (somethingToRead()) {
174+
signature[i] = readFromBuffer();
175+
} else {
176+
throw new MalformedFrameException("Invalid AMQP protocol header from server");
177+
}
178+
}
179+
180+
MalformedFrameException x;
181+
182+
if (signature[0] == 1 &&
183+
signature[1] == 1 &&
184+
signature[2] == 8 &&
185+
signature[3] == 0) {
186+
x = new MalformedFrameException("AMQP protocol version mismatch; we are version " +
187+
AMQP.PROTOCOL.MAJOR + "-" + AMQP.PROTOCOL.MINOR + "-" + AMQP.PROTOCOL.REVISION +
188+
", server is 0-8");
189+
} else {
190+
String sig = "";
191+
for (int i = 0; i < 4; i++) {
192+
if (i != 0)
193+
sig += ",";
194+
sig += signature[i];
195+
}
196+
197+
x = new MalformedFrameException("AMQP protocol version mismatch; we are version " +
198+
AMQP.PROTOCOL.MAJOR + "-" + AMQP.PROTOCOL.MINOR + "-" + AMQP.PROTOCOL.REVISION +
199+
", server sent signature " + sig);
200+
}
201+
throw x;
202+
}
203+
}

src/main/java/com/rabbitmq/client/impl/nio/HeaderWriteRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
*/
2626
public class HeaderWriteRequest implements WriteRequest {
2727

28+
public static final WriteRequest SINGLETON = new HeaderWriteRequest();
29+
30+
private HeaderWriteRequest() { }
31+
2832
@Override
2933
public void handle(DataOutputStream outputStream) throws IOException {
3034
outputStream.write("AMQP".getBytes("US-ASCII"));

src/main/java/com/rabbitmq/client/impl/nio/NioHelper.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,29 +29,4 @@ static int read(ReadableByteChannel channel, ByteBuffer buffer) throws IOExcepti
2929
return read;
3030
}
3131

32-
static int retryRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
33-
int attempt = 0;
34-
int read = 0;
35-
while(attempt < 3) {
36-
try {
37-
Thread.sleep(100L);
38-
} catch (InterruptedException e) {
39-
// ignore
40-
}
41-
read = read(channel, buffer);
42-
if(read > 0) {
43-
break;
44-
}
45-
attempt++;
46-
}
47-
return read;
48-
}
49-
50-
static int readWithRetry(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
51-
int bytesRead = NioHelper.read(channel, buffer);
52-
if (bytesRead <= 0) {
53-
bytesRead = NioHelper.retryRead(channel, buffer);
54-
}
55-
return bytesRead;
56-
}
5732
}

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

23-
import java.io.DataInputStream;
2423
import java.io.DataOutputStream;
2524
import java.io.IOException;
2625
import java.nio.ByteBuffer;
@@ -144,27 +143,27 @@ public void run() {
144143
continue;
145144
}
146145

147-
DataInputStream inputStream = state.inputStream;
148-
149146
state.prepareForReadSequence();
150147

151148
while (state.continueReading()) {
152-
Frame frame = Frame.readFrom(inputStream);
153-
154-
try {
155-
boolean noProblem = state.getConnection().handleReadFrame(frame);
156-
if (noProblem && (!state.getConnection().isRunning() || state.getConnection().hasBrokerInitiatedShutdown())) {
157-
// looks like the frame was Close-Ok or Close
158-
dispatchShutdownToConnection(state);
149+
final Frame frame = state.frameBuilder.readFrame();
150+
151+
if (frame != null) {
152+
try {
153+
boolean noProblem = state.getConnection().handleReadFrame(frame);
154+
if (noProblem && (!state.getConnection().isRunning() || state.getConnection().hasBrokerInitiatedShutdown())) {
155+
// looks like the frame was Close-Ok or Close
156+
dispatchShutdownToConnection(state);
157+
key.cancel();
158+
break;
159+
}
160+
} catch (Throwable ex) {
161+
// problem during frame processing, tell connection, and
162+
// we can stop for this channel
163+
handleIoError(state, ex);
159164
key.cancel();
160165
break;
161166
}
162-
} catch (Throwable ex) {
163-
// problem during frame processing, tell connection, and
164-
// we can stop for this channel
165-
handleIoError(state, ex);
166-
key.cancel();
167-
break;
168167
}
169168
}
170169

src/main/java/com/rabbitmq/client/impl/nio/NioParams.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
/**
2828
* Parameters used to configure the NIO mode of a {@link com.rabbitmq.client.ConnectionFactory}.
29-
*
29+
* @since 4.0.0
3030
*/
3131
public class NioParams {
3232

0 commit comments

Comments
 (0)