From 223c745d5c0fb630f2b3fd384b7c7ee65d6595ff Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 13 Dec 2022 18:10:18 +0100 Subject: [PATCH 01/10] [improvement] Use Pooled Netty Direct ByteBufs for Response serialisation --- .../common/requests/KopResponseUtils.java | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java index aa9de7a32c..5bf37b67c5 100644 --- a/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java +++ b/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java @@ -14,9 +14,15 @@ package org.apache.kafka.common.requests; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import io.netty.buffer.ByteBufOutputStream; +import java.io.DataOutputStream; import java.nio.ByteBuffer; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.DataOutputStreamWritable; +import org.apache.kafka.common.protocol.Message; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Writable; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; /** * Provide util classes to access protected fields in kafka structures. @@ -34,7 +40,11 @@ public class KopResponseUtils { public static ByteBuf serializeResponse(short version, ResponseHeader responseHeader, AbstractResponse response) { - return Unpooled.wrappedBuffer(response.serializeWithHeader(responseHeader, version)); + return serializeWithHeader(response, responseHeader, version); + } + + private static ByteBuf serializeWithHeader(AbstractResponse response, ResponseHeader header, short version) { + return serialize(header.data(), header.headerVersion(), response.data(), version); } public static ByteBuffer serializeRequest(RequestHeader requestHeader, AbstractRequest request) { @@ -42,4 +52,24 @@ public static ByteBuffer serializeRequest(RequestHeader requestHeader, AbstractR request.data(), request.version()); } + public static ByteBuf serialize( + Message header, + short headerVersion, + Message apiMessage, + short apiVersion + ) { + ObjectSerializationCache cache = new ObjectSerializationCache(); + + int headerSize = header.size(cache, headerVersion); + int messageSize = apiMessage.size(cache, apiVersion); + ByteBuf result = PulsarByteBufAllocator.DEFAULT.directBuffer(headerSize + messageSize); + + Writable writable = new DataOutputStreamWritable(new DataOutputStream(new ByteBufOutputStream(result))); + + header.write(writable, cache, headerVersion); + apiMessage.write(writable, cache, apiVersion); + + return result; + } + } From 42b8052fa8d67b43045bd75f2daab9a7a2db120e Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 13 Dec 2022 17:50:20 -0600 Subject: [PATCH 02/10] [fix] DataOutputStreamWritable#writeByteBuffer offset value --- .../requests/DataOutputStreamWritable.java | 43 +++++++++++++++++++ .../common/requests/KopResponseUtils.java | 1 - 2 files changed, 43 insertions(+), 1 deletion(-) create mode 100644 kafka-impl/src/main/java/org/apache/kafka/common/requests/DataOutputStreamWritable.java diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/DataOutputStreamWritable.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/DataOutputStreamWritable.java new file mode 100644 index 0000000000..796b3002c3 --- /dev/null +++ b/kafka-impl/src/main/java/org/apache/kafka/common/requests/DataOutputStreamWritable.java @@ -0,0 +1,43 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.utils.Utils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * This class is necessary to bypass a bug in + * DataOutputStreamWritable + */ +public class DataOutputStreamWritable extends org.apache.kafka.common.protocol.DataOutputStreamWritable { + public DataOutputStreamWritable(DataOutputStream out) { + super(out); + } + @Override + public void writeByteBuffer(ByteBuffer buf) { + try { + if (buf.hasArray()) { + out.write(buf.array(), buf.arrayOffset(), buf.limit()); + } else { + byte[] bytes = Utils.toArray(buf); + out.write(bytes); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java index 5bf37b67c5..96d9fc0aa3 100644 --- a/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java +++ b/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java @@ -18,7 +18,6 @@ import java.io.DataOutputStream; import java.nio.ByteBuffer; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.protocol.DataOutputStreamWritable; import org.apache.kafka.common.protocol.Message; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.Writable; From 8e40ffe3755fff63e26128c350f86aa930ee8a2e Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 14 Dec 2022 09:49:40 +0100 Subject: [PATCH 03/10] [improvement] Used more Pooled ByteBufs --- .../handlers/kop/KafkaCommandDecoder.java | 20 ++++++++++++++----- .../transaction/PendingRequest.java | 3 ++- .../TransactionMarkerChannelHandler.java | 2 +- .../common/requests/KopResponseUtils.java | 4 ++-- .../handlers/kop/KafkaCommonTestUtils.java | 8 +++----- .../handlers/kop/KafkaRequestHandlerTest.java | 9 ++++----- 6 files changed, 27 insertions(+), 19 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index d1d8618278..4342e04327 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -593,10 +593,6 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleCreatePartitions(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); - protected abstract void - handleDescribeCluster(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); - - public static class KafkaHeaderAndRequest { private static final String DEFAULT_CLIENT_HOST = ""; @@ -606,6 +602,8 @@ public static class KafkaHeaderAndRequest { private final ByteBuf buffer; private final SocketAddress remoteAddress; + private final AtomicBoolean released = new AtomicBoolean(); + public KafkaHeaderAndRequest(RequestHeader header, AbstractRequest request, ByteBuf buffer, @@ -617,6 +615,9 @@ public KafkaHeaderAndRequest(RequestHeader header, } public ByteBuf getBuffer() { + if (released.get()) { + throw new IllegalStateException("Already released"); + } return buffer; } @@ -650,8 +651,17 @@ public String toString() { this.header, this.request, this.remoteAddress); } + public void bufferReleased() { + if (!released.compareAndSet(false, true)) { + throw new IllegalStateException("Already released"); + } + } + public void close() { - ReferenceCountUtil.safeRelease(this.buffer); + if (!released.compareAndSet(false, true)) { + return; + } + ReferenceCountUtil.safeRelease(this.buffer); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java index c7e5ef6d60..d718d9424c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop.coordinator.transaction; +import io.netty.buffer.ByteBuf; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -41,7 +42,7 @@ public PendingRequest(final ApiKeys apiKeys, this.responseConsumerHandler = responseConsumerHandler; } - public ByteBuffer serialize() { + public ByteBuf serialize() { return KopResponseUtils.serializeRequest(requestHeader, request); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java index 39a75c5af6..95a1237c6e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java @@ -73,7 +73,7 @@ public TransactionMarkerChannelHandler( private void enqueueRequest(ChannelHandlerContext channel, PendingRequest pendingRequest) { final long correlationId = pendingRequest.getCorrelationId(); pendingRequestMap.put(correlationId, pendingRequest); - channel.writeAndFlush(Unpooled.wrappedBuffer(pendingRequest.serialize())).addListener(writeFuture -> { + channel.writeAndFlush(pendingRequest.serialize()).addListener(writeFuture -> { if (!writeFuture.isSuccess()) { pendingRequest.completeExceptionally(writeFuture.cause()); pendingRequestMap.remove(correlationId); diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java index 96d9fc0aa3..a59803c4d2 100644 --- a/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java +++ b/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java @@ -46,8 +46,8 @@ private static ByteBuf serializeWithHeader(AbstractResponse response, ResponseHe return serialize(header.data(), header.headerVersion(), response.data(), version); } - public static ByteBuffer serializeRequest(RequestHeader requestHeader, AbstractRequest request) { - return RequestUtils.serialize(requestHeader.data(), requestHeader.headerVersion(), + public static ByteBuf serializeRequest(RequestHeader requestHeader, AbstractRequest request) { + return serialize(requestHeader.data(), requestHeader.headerVersion(), request.data(), request.version()); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java index 1a28d88c3e..4ecb2a8c4d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java @@ -115,15 +115,13 @@ public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractReq RequestHeader mockHeader = new RequestHeader(builder.apiKey(), request.version(), "dummy", 1233); - ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(mockHeader, request); + ByteBuf byteBuf = KopResponseUtils.serializeRequest(mockHeader, request); - ByteBuf byteBuf = Unpooled.copiedBuffer(serializedRequest); - - RequestHeader header = RequestHeader.parse(serializedRequest); + RequestHeader header = RequestHeader.parse(byteBuf.nioBuffer()); ApiKeys apiKey = header.apiKey(); short apiVersion = header.apiVersion(); - AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, serializedRequest).request; + AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, byteBuf.nioBuffer()).request; return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 134f46c548..a41e50cbb8 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -188,17 +188,16 @@ public void testByteBufToRequest() { correlationId); // 1. serialize request into ByteBuf - ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest); - int size = serializedRequest.remaining(); - ByteBuf inputBuf = Unpooled.buffer(size); - inputBuf.writeBytes(serializedRequest); + ByteBuf serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest); // 2. turn Bytebuf into KafkaHeaderAndRequest. - KafkaHeaderAndRequest request = handler.byteBufToRequest(inputBuf, null); + KafkaHeaderAndRequest request = handler.byteBufToRequest(serializedRequest, null); // 3. verify byteBufToRequest works well. assertEquals(request.getHeader().data(), header.data()); assertTrue(request.getRequest() instanceof ApiVersionsRequest); + + request.close(); } From 139797fd1d3085214fcdcb5d3e5c61501fdb7f11 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 Mar 2023 13:57:56 +0100 Subject: [PATCH 04/10] restore after conflict --- .../streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 4342e04327..919de3f2ea 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -592,6 +592,8 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleCreatePartitions(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + protected abstract void + handleDescribeCluster(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); public static class KafkaHeaderAndRequest { From d775ac1d0f316d5505511bb02a52ec29bb01751c Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 Mar 2023 13:58:36 +0100 Subject: [PATCH 05/10] format --- .../streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 919de3f2ea..62d2e5facf 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -592,8 +592,10 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleCreatePartitions(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + protected abstract void handleDescribeCluster(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + public static class KafkaHeaderAndRequest { From d01a6c5bf9540ee4997092335919a2e5caa8a45c Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 Mar 2023 13:59:57 +0100 Subject: [PATCH 06/10] backport from S4K --- .../pulsar/handlers/kop/KafkaCommandDecoder.java | 1 - .../kafka/common/requests/KopResponseUtils.java | 16 ++++++---------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 62d2e5facf..9730eaf0ce 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -596,7 +596,6 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleDescribeCluster(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); - public static class KafkaHeaderAndRequest { private static final String DEFAULT_CLIENT_HOST = ""; diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java index a59803c4d2..9b65c5fe49 100644 --- a/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java +++ b/kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java @@ -14,14 +14,12 @@ package org.apache.kafka.common.requests; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; -import java.io.DataOutputStream; +import io.netty.buffer.Unpooled; import java.nio.ByteBuffer; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Message; import org.apache.kafka.common.protocol.ObjectSerializationCache; -import org.apache.kafka.common.protocol.Writable; -import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; /** * Provide util classes to access protected fields in kafka structures. @@ -61,14 +59,12 @@ public static ByteBuf serialize( int headerSize = header.size(cache, headerVersion); int messageSize = apiMessage.size(cache, apiVersion); - ByteBuf result = PulsarByteBufAllocator.DEFAULT.directBuffer(headerSize + messageSize); - - Writable writable = new DataOutputStreamWritable(new DataOutputStream(new ByteBufOutputStream(result))); - + ByteBuffer result = ByteBuffer.allocate(headerSize + messageSize); + ByteBufferAccessor writable = new ByteBufferAccessor(result); header.write(writable, cache, headerVersion); apiMessage.write(writable, cache, apiVersion); - - return result; + result.flip(); + return Unpooled.wrappedBuffer(result); } } From 7e0ed55ef1d18bf21a6bcae25b6420d2552fb115 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 Mar 2023 14:03:46 +0100 Subject: [PATCH 07/10] remove useless class --- .../handlers/kop/KafkaCommandDecoder.java | 2 +- .../requests/DataOutputStreamWritable.java | 43 ------------------- 2 files changed, 1 insertion(+), 44 deletions(-) delete mode 100644 kafka-impl/src/main/java/org/apache/kafka/common/requests/DataOutputStreamWritable.java diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 9730eaf0ce..28b8b843c9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -595,7 +595,7 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleDescribeCluster(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); - + public static class KafkaHeaderAndRequest { private static final String DEFAULT_CLIENT_HOST = ""; diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/DataOutputStreamWritable.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/DataOutputStreamWritable.java deleted file mode 100644 index 796b3002c3..0000000000 --- a/kafka-impl/src/main/java/org/apache/kafka/common/requests/DataOutputStreamWritable.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.utils.Utils; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; - -/** - * This class is necessary to bypass a bug in - * DataOutputStreamWritable - */ -public class DataOutputStreamWritable extends org.apache.kafka.common.protocol.DataOutputStreamWritable { - public DataOutputStreamWritable(DataOutputStream out) { - super(out); - } - @Override - public void writeByteBuffer(ByteBuffer buf) { - try { - if (buf.hasArray()) { - out.write(buf.array(), buf.arrayOffset(), buf.limit()); - } else { - byte[] bytes = Utils.toArray(buf); - out.write(bytes); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } -} From e5eacf88f66657b9517ed42dff1ee513bbfdbac6 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 Mar 2023 14:07:39 +0100 Subject: [PATCH 08/10] style --- .../transaction/TransactionMarkerChannelHandler.java | 1 - .../streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java index 95a1237c6e..4ed3156755 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java @@ -17,7 +17,6 @@ import static org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.streamnative.pulsar.handlers.kop.security.PlainSaslServer; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java index 4ecb2a8c4d..caedbc1572 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java @@ -14,9 +14,7 @@ package io.streamnative.pulsar.handlers.kop; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Optional; From 0cb696d2873f735b7675b0b0d2ad9e76b551c6dc Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 Mar 2023 15:28:02 +0100 Subject: [PATCH 09/10] Fix KafkaCommonTestUtils --- .../handlers/kop/KafkaCommonTestUtils.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java index caedbc1572..6ddaacbe49 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java @@ -13,8 +13,11 @@ */ package io.streamnative.pulsar.handlers.kop; +import static org.testng.Assert.assertEquals; + import io.netty.buffer.ByteBuf; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -106,20 +109,24 @@ public static ListOffsetsResponseData.ListOffsetsPartitionResponse getListOffset return listOffsetsPartitionResponse; } - public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, - SocketAddress serviceAddress) { - AbstractRequest request = builder.build(builder.apiKey().latestVersion()); + SocketAddress serviceAddress) { + return buildRequest(builder, serviceAddress, builder.latestAllowedVersion()); + } + public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, + SocketAddress serviceAddress, short version) { + AbstractRequest request = builder.build(version); + assertEquals(version, request.version()); RequestHeader mockHeader = new RequestHeader(builder.apiKey(), request.version(), "dummy", 1233); ByteBuf byteBuf = KopResponseUtils.serializeRequest(mockHeader, request); - - RequestHeader header = RequestHeader.parse(byteBuf.nioBuffer()); + ByteBuffer byteBuffer = byteBuf.nioBuffer(); + RequestHeader header = RequestHeader.parse(byteBuffer); ApiKeys apiKey = header.apiKey(); short apiVersion = header.apiVersion(); - AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, byteBuf.nioBuffer()).request; + AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, byteBuffer).request; return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); } } From bed71a92dd8c8a47c99e1e822f292a13bd58c9e8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 21 Mar 2023 11:06:36 +0800 Subject: [PATCH 10/10] Fix indent --- .../pulsar/handlers/kop/KafkaCommandDecoder.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 28b8b843c9..548ff7c159 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -661,10 +661,10 @@ public void bufferReleased() { } public void close() { - if (!released.compareAndSet(false, true)) { - return; - } - ReferenceCountUtil.safeRelease(this.buffer); + if (!released.compareAndSet(false, true)) { + return; + } + ReferenceCountUtil.safeRelease(this.buffer); } }