From 3602c9e826d903d97091af1cc608b9d88c1b8cf3 Mon Sep 17 00:00:00 2001 From: ran Date: Wed, 26 Jul 2023 16:34:23 +0800 Subject: [PATCH] [bugfix] AppendRecordsContext cannot be Recyclable (#1967) (cherry picked from commit 0b6fd77c0759c54d92ac5aee9b585d1c71c88712) ### Motivation AppendRecordsContext cannot be Recyclable. ### Modifications Don't recycle the AppendRecordsContext. Co-authored-by: Enrico Olivelli --- .../handlers/kop/KafkaRequestHandler.java | 2 - .../kop/storage/AppendRecordsContext.java | 38 +++++-------------- 2 files changed, 9 insertions(+), 31 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 89d37ecb75..9cffb86d0b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -875,7 +875,6 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, PartitionLog.AppendOrigin.Client, appendRecordsContext ).whenComplete((response, ex) -> { - appendRecordsContext.recycle(); if (ex != null) { resultFuture.completeExceptionally(ex.getCause()); return; @@ -2449,7 +2448,6 @@ protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest PartitionLog.AppendOrigin.Coordinator, appendRecordsContext ).whenComplete((result, ex) -> { - appendRecordsContext.recycle(); if (ex != null) { log.error("[{}] Append txn marker ({}) failed.", ctx.channel(), marker, ex); Map currentErrors = new HashMap<>(); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AppendRecordsContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AppendRecordsContext.java index b830792a47..42110b3de9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AppendRecordsContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AppendRecordsContext.java @@ -14,59 +14,39 @@ package io.streamnative.pulsar.handlers.kop.storage; import io.netty.channel.ChannelHandlerContext; -import io.netty.util.Recycler; import io.streamnative.pulsar.handlers.kop.KafkaTopicManager; import io.streamnative.pulsar.handlers.kop.PendingTopicFutures; import java.util.Map; import java.util.function.Consumer; +import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.TopicPartition; /** * AppendRecordsContext is use for pass parameters to ReplicaManager, to avoid long parameter lists. */ +@Slf4j +@AllArgsConstructor @Getter public class AppendRecordsContext { - private static final Recycler RECYCLER = new Recycler() { - protected AppendRecordsContext newObject(Handle handle) { - return new AppendRecordsContext(handle); - } - }; - - private final Recycler.Handle recyclerHandle; private KafkaTopicManager topicManager; private Consumer startSendOperationForThrottling; private Consumer completeSendOperationForThrottling; private Map pendingTopicFuturesMap; private ChannelHandlerContext ctx; - private AppendRecordsContext(Recycler.Handle recyclerHandle) { - this.recyclerHandle = recyclerHandle; - } - // recycler and get for this object public static AppendRecordsContext get(final KafkaTopicManager topicManager, final Consumer startSendOperationForThrottling, final Consumer completeSendOperationForThrottling, final Map pendingTopicFuturesMap, final ChannelHandlerContext ctx) { - AppendRecordsContext context = RECYCLER.get(); - context.topicManager = topicManager; - context.startSendOperationForThrottling = startSendOperationForThrottling; - context.completeSendOperationForThrottling = completeSendOperationForThrottling; - context.pendingTopicFuturesMap = pendingTopicFuturesMap; - context.ctx = ctx; - - return context; - } - - public void recycle() { - topicManager = null; - startSendOperationForThrottling = null; - completeSendOperationForThrottling = null; - pendingTopicFuturesMap = null; - recyclerHandle.recycle(this); - ctx = null; + return new AppendRecordsContext(topicManager, + startSendOperationForThrottling, + completeSendOperationForThrottling, + pendingTopicFuturesMap, + ctx); } }