Skip to content

Commit cdd3daa

Browse files
anuchandyLiudmila Molkova
andauthored
[Messaging] Reduce allocation for Send by using RedableBuffer (Azure#34591)
* Using RedableBuffer to send * better fun name, revert collapsed imports * Using rewind to seek the position when retrying Co-authored-by: Liudmila Molkova <[email protected]> * Productize the ReadableBuffer based apporach * remove var added for dev-time testing * Moving code comment to git description * changelog update * let's not new-up ReadableBuffer in non-batch cases * (review feedback): use monoError to report buffer-overflow and log when a message in the batch is empty. --------- Co-authored-by: Liudmila Molkova <[email protected]>
1 parent f079ad7 commit cdd3daa

File tree

4 files changed

+147
-78
lines changed

4 files changed

+147
-78
lines changed

sdk/core/azure-core-amqp/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Addressing the overhead of batch send allocating byte array equal to the max message size. ([34426](https://github.com/Azure/azure-sdk-for-java/issues/34426))
12+
1113
### Other Changes
1214

1315
## 2.8.4 (2023-04-07)

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java

Lines changed: 108 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
1515
import com.azure.core.util.AsyncCloseable;
1616
import com.azure.core.util.CoreUtils;
17+
import com.azure.core.util.FluxUtil;
1718
import com.azure.core.util.logging.ClientLogger;
1819
import org.apache.qpid.proton.Proton;
1920
import org.apache.qpid.proton.amqp.Binary;
@@ -26,6 +27,8 @@
2627
import org.apache.qpid.proton.amqp.transaction.Declared;
2728
import org.apache.qpid.proton.amqp.transport.DeliveryState;
2829
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
30+
import org.apache.qpid.proton.codec.CompositeReadableBuffer;
31+
import org.apache.qpid.proton.codec.ReadableBuffer;
2932
import org.apache.qpid.proton.engine.Delivery;
3033
import org.apache.qpid.proton.engine.EndpointState;
3134
import org.apache.qpid.proton.engine.Sender;
@@ -44,6 +47,7 @@
4447
import java.nio.BufferOverflowException;
4548
import java.time.Duration;
4649
import java.time.Instant;
50+
import java.util.Arrays;
4751
import java.util.Comparator;
4852
import java.util.List;
4953
import java.util.Locale;
@@ -265,62 +269,103 @@ public Mono<Void> send(List<Message> messageBatch, DeliveryState deliveryState)
265269

266270
return getLinkSize()
267271
.flatMap(maxMessageSize -> {
268-
final Message firstMessage = messageBatch.get(0);
272+
int totalEncodedSize = 0;
273+
final CompositeReadableBuffer buffer = new CompositeReadableBuffer();
274+
275+
final byte[] envelopBytes = batchEnvelopBytes(messageBatch.get(0), maxMessageSize);
276+
if (envelopBytes.length > 0) {
277+
totalEncodedSize += envelopBytes.length;
278+
if (totalEncodedSize > maxMessageSize) {
279+
return batchBufferOverflowError(maxMessageSize);
280+
}
281+
buffer.append(envelopBytes);
282+
}
269283

270-
// proton-j doesn't support multiple dataSections to be part of AmqpMessage
271-
// here's the alternate approach provided by them: https://github.com/apache/qpid-proton/pull/54
272-
final Message batchMessage = Proton.message();
273-
batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());
284+
for (final Message message : messageBatch) {
285+
final byte[] sectionBytes = batchBinaryDataSectionBytes(message, maxMessageSize);
286+
if (sectionBytes.length > 0) {
287+
totalEncodedSize += sectionBytes.length;
288+
if (totalEncodedSize > maxMessageSize) {
289+
return batchBufferOverflowError(maxMessageSize);
290+
}
291+
buffer.append(sectionBytes);
292+
} else {
293+
logger.info("Ignoring the empty message org.apache.qpid.proton.message.message@{} in the batch.",
294+
Integer.toHexString(System.identityHashCode(message)));
295+
}
296+
}
274297

275-
// Set partition identifier properties of the first message on batch message
276-
if ((firstMessage.getMessageId() instanceof String)
277-
&& !CoreUtils.isNullOrEmpty((String) firstMessage.getMessageId())) {
298+
return send(buffer, AmqpConstants.AMQP_BATCH_MESSAGE_FORMAT, deliveryState);
299+
}).then();
300+
}
278301

279-
batchMessage.setMessageId(firstMessage.getMessageId());
280-
}
302+
private byte[] batchEnvelopBytes(Message envelopMessage, int maxMessageSize) {
303+
// Proton-j doesn't support multiple dataSections to be part of AmqpMessage.
304+
// Here's the alternate approach provided: https://github.com/apache/qpid-proton/pull/54
305+
final Message message = Proton.message();
306+
message.setMessageAnnotations(envelopMessage.getMessageAnnotations());
281307

282-
if (!CoreUtils.isNullOrEmpty(firstMessage.getGroupId())) {
283-
batchMessage.setGroupId(firstMessage.getGroupId());
284-
}
308+
// Set partition identifier properties of the first message on batch message
309+
if ((envelopMessage.getMessageId() instanceof String)
310+
&& !CoreUtils.isNullOrEmpty((String) envelopMessage.getMessageId())) {
285311

286-
final int maxMessageSizeTemp = maxMessageSize;
287-
288-
final byte[] bytes = new byte[maxMessageSizeTemp];
289-
int encodedSize = batchMessage.encode(bytes, 0, maxMessageSizeTemp);
290-
int byteArrayOffset = encodedSize;
291-
292-
for (final Message amqpMessage : messageBatch) {
293-
final Message messageWrappedByData = Proton.message();
294-
295-
int payloadSize = messageSerializer.getSize(amqpMessage);
296-
int allocationSize =
297-
Math.min(payloadSize + MAX_AMQP_HEADER_SIZE_BYTES, maxMessageSizeTemp);
298-
299-
byte[] messageBytes = new byte[allocationSize];
300-
int messageSizeBytes = amqpMessage.encode(messageBytes, 0, allocationSize);
301-
messageWrappedByData.setBody(new Data(new Binary(messageBytes, 0, messageSizeBytes)));
302-
303-
try {
304-
encodedSize =
305-
messageWrappedByData
306-
.encode(bytes, byteArrayOffset, maxMessageSizeTemp - byteArrayOffset - 1);
307-
} catch (BufferOverflowException exception) {
308-
final String message =
309-
String.format(Locale.US,
310-
"Size of the payload exceeded maximum message size: %s kb",
311-
maxMessageSizeTemp / 1024);
312-
final AmqpException error = new AmqpException(false,
313-
AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, message, exception,
314-
handler.getErrorContext(sender));
315-
316-
return Mono.error(error);
317-
}
312+
message.setMessageId(envelopMessage.getMessageId());
313+
}
318314

319-
byteArrayOffset = byteArrayOffset + encodedSize;
320-
}
315+
if (!CoreUtils.isNullOrEmpty(envelopMessage.getGroupId())) {
316+
message.setGroupId(envelopMessage.getGroupId());
317+
}
321318

322-
return send(bytes, byteArrayOffset, AmqpConstants.AMQP_BATCH_MESSAGE_FORMAT, deliveryState);
323-
}).then();
319+
final int size = messageSerializer.getSize(message);
320+
final int allocationSize = Math.min(size + MAX_AMQP_HEADER_SIZE_BYTES, maxMessageSize);
321+
final byte[] encodedBytes = new byte[allocationSize];
322+
final int encodedSize = message.encode(encodedBytes, 0, allocationSize);
323+
// This copyOf copying is just few bytes for envelop.
324+
return Arrays.copyOf(encodedBytes, encodedSize);
325+
}
326+
327+
private byte[] batchBinaryDataSectionBytes(Message sectionMessage, int maxMessageSize) {
328+
final int size = messageSerializer.getSize(sectionMessage);
329+
final int allocationSize = Math.min(size + MAX_AMQP_HEADER_SIZE_BYTES, maxMessageSize);
330+
final byte[] encodedBytes = new byte[allocationSize];
331+
final int encodedSize = sectionMessage.encode(encodedBytes, 0, allocationSize);
332+
333+
final Message message = Proton.message();
334+
final Data binaryData = new Data(new Binary(encodedBytes, 0, encodedSize));
335+
message.setBody(binaryData);
336+
final int binaryRawSize = binaryData.getValue().getLength();
337+
// Precompute the "amqp:data:binary" encoded size -
338+
final int binaryEncodedSize = binaryEncodedSize(binaryRawSize);
339+
// ^ this pre-computation avoids allocating byte[] 'arr1' of estimated encoded size (to pass to message.encode(arr1,))
340+
// and a second allocation of byte[] 'arr2' with exact encoded size (returned from message.encode(arr1,)) then
341+
// copying encoded size bytes from 'arr1' to 'arr2'. Skipping extra allocations and CPU cycles for copying.
342+
final byte[] binaryEncodedBytes = new byte[binaryEncodedSize];
343+
message.encode(binaryEncodedBytes, 0, binaryEncodedSize);
344+
return binaryEncodedBytes;
345+
}
346+
347+
private Mono<Void> batchBufferOverflowError(int maxMessageSize) {
348+
return FluxUtil.monoError(logger, new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
349+
String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb", maxMessageSize / 1024),
350+
new BufferOverflowException(), handler.getErrorContext(sender)));
351+
}
352+
353+
/**
354+
* Compute the encoded size when encoding a binary data of given size per Amqp 1.0 spec "amqp:data:binary" format.
355+
*
356+
* @param binaryRawSize the length of the binary data.
357+
* @return the encoded size.
358+
*/
359+
private int binaryEncodedSize(int binaryRawSize) {
360+
if (binaryRawSize <= 255) {
361+
// [0x00,0x53,0x75,0xa0,{byte(Data.Binary.Length)},{Data.Binary.bytes}]
362+
// The AMQP 1.0 spec format ^ for amqp:data:binary when the raw bytes length is <= 255.
363+
return 5 + binaryRawSize;
364+
} else {
365+
// [0x00,0x53,0x75,0xb0,{int(Data.Binary.Length)},{Data.Binary.bytes}]
366+
// The AMQP 1.0 spec format ^ for amqp:data:binary when the raw bytes length is > 255.
367+
return 8 + binaryRawSize;
368+
}
324369
}
325370

326371
@Override
@@ -457,16 +502,24 @@ Mono<Void> isClosed() {
457502

458503
@Override
459504
public Mono<DeliveryState> send(byte[] bytes, int arrayOffset, int messageFormat, DeliveryState deliveryState) {
460-
final Flux<EndpointState> activeEndpointFlux = RetryUtil.withRetry(
461-
handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE), retryOptions,
462-
activeTimeoutMessage);
463-
464-
return activeEndpointFlux.then(Mono.create(sink -> {
505+
return onEndpointActive().then(Mono.create(sink -> {
465506
sendWork(new RetriableWorkItem(bytes, arrayOffset, messageFormat, sink, retryOptions.getTryTimeout(),
466507
deliveryState, metricsProvider));
467508
}));
468509
}
469510

511+
Mono<DeliveryState> send(ReadableBuffer buffer, int messageFormat, DeliveryState deliveryState) {
512+
return onEndpointActive().then(Mono.create(sink -> {
513+
sendWork(new RetriableWorkItem(buffer, messageFormat, sink, retryOptions.getTryTimeout(),
514+
deliveryState, metricsProvider));
515+
}));
516+
}
517+
518+
private Flux<EndpointState> onEndpointActive() {
519+
return RetryUtil.withRetry(handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE),
520+
retryOptions, activeTimeoutMessage);
521+
}
522+
470523
/**
471524
* Add the work item in pending send to be processed on {@link ReactorDispatcher} thread.
472525
*
@@ -536,10 +589,7 @@ private void processSendWork() {
536589
if (workItem.isDeliveryStateProvided()) {
537590
delivery.disposition(workItem.getDeliveryState());
538591
}
539-
sentMsgSize = sender.send(workItem.getMessage(), 0, workItem.getEncodedMessageSize());
540-
assert sentMsgSize == workItem.getEncodedMessageSize()
541-
: "Contract of the ProtonJ library for Sender. Send API changed";
542-
592+
workItem.send(sender);
543593
linkAdvance = sender.advance();
544594
} catch (Exception exception) {
545595
sendException = exception;

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetriableWorkItem.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package com.azure.core.amqp.implementation;
55

66
import org.apache.qpid.proton.amqp.transport.DeliveryState;
7+
import org.apache.qpid.proton.codec.ReadableBuffer;
8+
import org.apache.qpid.proton.engine.Sender;
79
import reactor.core.publisher.MonoSink;
810

911
import java.time.Duration;
@@ -17,7 +19,8 @@ class RetriableWorkItem {
1719
private final AtomicInteger retryAttempts = new AtomicInteger();
1820
private final MonoSink<DeliveryState> monoSink;
1921
private final TimeoutTracker timeoutTracker;
20-
private final byte[] amqpMessage;
22+
private final ReadableBuffer encodedBuffer;
23+
private final byte[] encodedBytes;
2124
private final int messageFormat;
2225
private final int encodedMessageSize;
2326
private final DeliveryState deliveryState;
@@ -27,27 +30,30 @@ class RetriableWorkItem {
2730
private final AmqpMetricsProvider metricsProvider;
2831
private long tryStartTime = 0;
2932

30-
RetriableWorkItem(byte[] amqpMessage, int encodedMessageSize, int messageFormat, MonoSink<DeliveryState> monoSink,
31-
Duration timeout, DeliveryState deliveryState, AmqpMetricsProvider metricsProvider) {
32-
this(amqpMessage, encodedMessageSize, messageFormat, monoSink, new TimeoutTracker(timeout,
33-
false), deliveryState, metricsProvider);
33+
RetriableWorkItem(ReadableBuffer buffer, int messageFormat, MonoSink<DeliveryState> monoSink, Duration timeout,
34+
DeliveryState deliveryState, AmqpMetricsProvider metricsProvider) {
35+
this.encodedBuffer = buffer;
36+
this.encodedBytes = null;
37+
this.encodedMessageSize = buffer.remaining();
38+
this.messageFormat = messageFormat;
39+
this.monoSink = monoSink;
40+
this.timeoutTracker = new TimeoutTracker(timeout, false);
41+
this.deliveryState = deliveryState;
42+
this.metricsProvider = metricsProvider;
3443
}
3544

36-
private RetriableWorkItem(byte[] amqpMessage, int encodedMessageSize, int messageFormat, MonoSink<DeliveryState>
37-
monoSink, TimeoutTracker timeout, DeliveryState deliveryState, AmqpMetricsProvider metricsProvider) {
38-
this.amqpMessage = amqpMessage;
45+
RetriableWorkItem(byte[] bytes, int encodedMessageSize, int messageFormat, MonoSink<DeliveryState> monoSink, Duration timeout,
46+
DeliveryState deliveryState, AmqpMetricsProvider metricsProvider) {
47+
this.encodedBytes = bytes;
48+
this.encodedBuffer = null;
3949
this.encodedMessageSize = encodedMessageSize;
4050
this.messageFormat = messageFormat;
4151
this.monoSink = monoSink;
42-
this.timeoutTracker = timeout;
52+
this.timeoutTracker = new TimeoutTracker(timeout, false);
4353
this.deliveryState = deliveryState;
4454
this.metricsProvider = metricsProvider;
4555
}
4656

47-
byte[] getMessage() {
48-
return amqpMessage;
49-
}
50-
5157
DeliveryState getDeliveryState() {
5258
return deliveryState;
5359
}
@@ -108,6 +114,17 @@ boolean isWaitingForAck() {
108114
return this.waitingForAck;
109115
}
110116

117+
void send(Sender sender) {
118+
final int sentMsgSize;
119+
if (encodedBytes != null) {
120+
sentMsgSize = sender.send(encodedBytes, 0, encodedMessageSize);
121+
} else {
122+
encodedBuffer.rewind();
123+
sentMsgSize = sender.send(encodedBuffer);
124+
}
125+
assert sentMsgSize == encodedMessageSize : "Contract of the ProtonJ library for Sender. Send API changed";
126+
}
127+
111128
private void reportMetrics(DeliveryState deliveryState) {
112129
if (metricsProvider.isSendDeliveryEnabled()) {
113130
metricsProvider.recordSend(tryStartTime, deliveryState == null ? null : deliveryState.getType());

sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
2626
import org.apache.qpid.proton.amqp.transport.DeliveryState;
2727
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
28+
import org.apache.qpid.proton.codec.ReadableBuffer;
2829
import org.apache.qpid.proton.engine.Delivery;
2930
import org.apache.qpid.proton.engine.EndpointState;
3031
import org.apache.qpid.proton.engine.Sender;
@@ -107,7 +108,7 @@ public class ReactorSenderTest {
107108
private Scheduler scheduler;
108109

109110
@Captor
110-
private ArgumentCaptor<byte[]> messageBytesCaptor;
111+
private ArgumentCaptor<ReadableBuffer> messageBufferCaptor;
111112

112113
@Captor
113114
private ArgumentCaptor<DeliveryState> deliveryStateArgumentCaptor;
@@ -318,7 +319,7 @@ public void testSendBatch() {
318319
reactorProvider, tokenManager, messageSerializer, options, scheduler, AmqpMetricsProvider.noop());
319320
final ReactorSender spyReactorSender = spy(reactorSender);
320321

321-
doReturn(Mono.empty()).when(spyReactorSender).send(any(byte[].class), anyInt(), anyInt(), isNull());
322+
doReturn(Mono.empty()).when(spyReactorSender).send(any(ReadableBuffer.class), anyInt(), isNull());
322323

323324
// Act
324325
StepVerifier.create(spyReactorSender.send(Arrays.asList(message, message2)))
@@ -330,14 +331,13 @@ public void testSendBatch() {
330331

331332
// Assert
332333
verify(sender, times(1)).getRemoteMaxMessageSize();
333-
verify(spyReactorSender, times(2)).send(messageBytesCaptor.capture(), anyInt(),
334-
anyInt(), isNull());
334+
verify(spyReactorSender, times(2)).send(messageBufferCaptor.capture(), anyInt(), isNull());
335335

336-
assertFalse(messageBytesCaptor.getAllValues().isEmpty());
336+
assertFalse(messageBufferCaptor.getAllValues().isEmpty());
337337

338-
messageBytesCaptor.getAllValues().forEach(delivery -> {
338+
messageBufferCaptor.getAllValues().forEach(delivery -> {
339339
final Message actual = Proton.message();
340-
actual.decode(delivery, 0, delivery.length);
340+
actual.decode(delivery);
341341

342342
final Object actualMessageId = actual.getMessageId();
343343
final String actualGroupId = actual.getGroupId();

0 commit comments

Comments
 (0)