From f735384f259fb32112565d446e195e348b3ea407 Mon Sep 17 00:00:00 2001
From: ChickenchickenLove <ojt90902@naver.com>
Date: Sun, 24 Mar 2024 21:36:56 +0900
Subject: [PATCH 1/5] Implementation to integrate ParallelConsumer.

---
 build.gradle                                  |   4 +
 .../annotation/EnableParallelConsumer.java    |  39 +++++++
 .../kafka/config/ParallelConsumerConfig.java  | 108 ++++++++++++++++++
 .../config/ParallelConsumerConfiguration.java |  47 ++++++++
 .../kafka/config/ParallelConsumerContext.java |  60 ++++++++++
 .../ParallelConsumerImportSelector.java       |  35 ++++++
 .../kafka/core/ParallelConsumerCallback.java  |  42 +++++++
 .../kafka/core/ParallelConsumerFactory.java   |  65 +++++++++++
 8 files changed, 400 insertions(+)
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java

diff --git a/build.gradle b/build.gradle
index bf86b4d892..5c9eae6758 100644
--- a/build.gradle
+++ b/build.gradle
@@ -73,6 +73,7 @@ ext {
 	springRetryVersion = '2.0.5'
 	springVersion = '6.1.5'
 	zookeeperVersion = '3.8.4'
+	parallelConsumerVersion = '0.5.2.8'
 
 	idPrefix = 'kafka'
 
@@ -289,6 +290,9 @@ project ('spring-kafka') {
 			exclude group: 'org.jetbrains.kotlin'
 		}
 
+		// Parallel Consumer
+		api "io.confluent.parallelconsumer:parallel-consumer-core:$parallelConsumerVersion"
+
 		// Spring Data projection message binding support
 		optionalApi ("org.springframework.data:spring-data-commons") {
 			exclude group: 'org.springframework'
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java
new file mode 100644
index 0000000000..6966edbde3
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.annotation;
+
+import org.springframework.context.annotation.Import;
+import org.springframework.kafka.config.ParallelConsumerConfiguration;
+import org.springframework.kafka.config.ParallelConsumerImportSelector;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * If you want to import {@link ParallelConsumerConfiguration} to your application,
+ * you just annotated {@link EnableParallelConsumer} to your spring application.
+ * @author ...
+ * @since 3.2.0
+ */
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Import(ParallelConsumerImportSelector.class)
+public @interface EnableParallelConsumer {
+}
\ No newline at end of file
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
new file mode 100644
index 0000000000..6abc17a387
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.config;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.Consumer;
+
+import io.confluent.parallelconsumer.ParallelConsumerOptions;
+import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
+import org.springframework.util.StringUtils;
+import org.springframework.kafka.annotation.EnableParallelConsumer;
+
+/**
+ * ParallelConsumerConfig is for config of {@link io.confluent.parallelconsumer}.
+ * This will be registered as Spring Bean when {@link EnableParallelConsumer} is annotated to your spring application.
+ * @author ...
+ * @since 3.2.0
+ */
+
+public class ParallelConsumerConfig {
+
+
+	private static final String PARALLEL_CONSUMER_MAX_CONCURRENCY = "PARALLEL_CONSUMER_MAX_CONCURRENCY";
+	private static final String PARALLEL_CONSUMER_ORDERING = "PARALLEL_CONSUMER_ORDERING";
+	private static final String ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT = "ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT";
+	private static final String COMMIT_LOCK_ACQUISITION_TIMEOUT = "COMMIT_LOCK_ACQUISITION_TIMEOUT";
+	private static final String COMMIT_INTERVAL = "COMMIT_INTERVAL";
+	private final Map<String, String> properties = new HashMap<>();
+
+	public ParallelConsumerConfig() {
+
+		final String maxConcurrency = System.getenv(PARALLEL_CONSUMER_MAX_CONCURRENCY);
+		final String ordering = System.getenv(PARALLEL_CONSUMER_ORDERING);
+		final String allowEagerProcessingDuringTransactionCommit = System.getenv(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT);
+		final String commitLockAcquisitionTimeout = System.getenv(COMMIT_LOCK_ACQUISITION_TIMEOUT);
+		final String commitInterval = System.getenv(COMMIT_INTERVAL);
+
+		this.properties.put(PARALLEL_CONSUMER_MAX_CONCURRENCY, maxConcurrency);
+		this.properties.put(PARALLEL_CONSUMER_ORDERING, ordering);
+		this.properties.put(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT, allowEagerProcessingDuringTransactionCommit);
+		this.properties.put(COMMIT_LOCK_ACQUISITION_TIMEOUT, commitLockAcquisitionTimeout);
+		this.properties.put(COMMIT_INTERVAL, commitInterval);
+	}
+
+	private ProcessingOrder toOrder(String order) {
+		return switch (order) {
+			case "partition" -> ProcessingOrder.PARTITION;
+			case "unordered" -> ProcessingOrder.UNORDERED;
+			default -> ProcessingOrder.KEY; // Confluent Consumer Default Policy
+		};
+	}
+
+	public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(Consumer<K, V> consumer) {
+
+		ParallelConsumerOptions.ParallelConsumerOptionsBuilder<K, V> builder = ParallelConsumerOptions.builder();
+		builder.consumer(consumer);
+
+		final String maxConcurrencyString = this.properties.get(PARALLEL_CONSUMER_MAX_CONCURRENCY);
+		final String orderingString = this.properties.get(PARALLEL_CONSUMER_ORDERING);
+		final String allowEagerProcessingDuringTransactionCommitString = this.properties.get(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT);
+		final String commitLockAcquisitionTimeoutString = this.properties.get(COMMIT_LOCK_ACQUISITION_TIMEOUT);
+		final String commitIntervalString = this.properties.get(COMMIT_INTERVAL);
+
+		if (StringUtils.hasText(maxConcurrencyString)) {
+			final Integer maxConcurrency = Integer.valueOf(maxConcurrencyString);
+			builder.maxConcurrency(maxConcurrency);
+		}
+
+		if (StringUtils.hasText(orderingString)) {
+			final ProcessingOrder processingOrder = toOrder(orderingString);
+			builder.ordering(processingOrder);
+		}
+
+		if (StringUtils.hasText(allowEagerProcessingDuringTransactionCommitString)) {
+			final Boolean allowEagerProcessingDuringTransactionCommit = Boolean.valueOf(allowEagerProcessingDuringTransactionCommitString);
+			builder.allowEagerProcessingDuringTransactionCommit(allowEagerProcessingDuringTransactionCommit);
+		}
+
+		if (StringUtils.hasText(commitLockAcquisitionTimeoutString)) {
+			final Long commitLockAcquisitionTimeout = Long.valueOf(commitLockAcquisitionTimeoutString);
+			builder.commitLockAcquisitionTimeout(Duration.ofSeconds(commitLockAcquisitionTimeout));
+		}
+
+		if (StringUtils.hasText(commitIntervalString)) {
+			final Long commitInterval = Long.valueOf(commitIntervalString);
+			builder.commitInterval(Duration.ofMillis(commitInterval));
+		}
+
+		return builder.build();
+	}
+}
\ No newline at end of file
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
new file mode 100644
index 0000000000..a52bdb1de4
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.ParallelConsumerCallback;
+import org.springframework.kafka.core.ParallelConsumerFactory;
+import org.springframework.kafka.annotation.EnableParallelConsumer;
+
+/**
+ * If User decide to use parallelConsumer on SpringKafka, User should import this class to their ComponentScan scopes.
+ * If so, this class will register both {@link ParallelConsumerContext} and {@link ParallelConsumerFactory} as Spring Bean.
+ * User has responsibility
+ *   1. annotated {@link EnableParallelConsumer} on their spring application
+ *   2. register ConcreteClass of {@link ParallelConsumerCallback}.
+ * @author ...
+ * @since 3.2.0
+ */
+
+public class ParallelConsumerConfiguration<K, V> {
+
+	@Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME)
+	public ParallelConsumerContext<K,V> parallelConsumerContext(ParallelConsumerCallback<K, V> parallelConsumerCallback) {
+		return new ParallelConsumerContext(parallelConsumerCallback);
+	}
+
+	@Bean(name = ParallelConsumerFactory.DEFAULT_BEAN_NAME)
+	public ParallelConsumerFactory<K,V> parallelConsumerFactory(DefaultKafkaConsumerFactory<K,V> consumerFactory,
+																ParallelConsumerContext<K,V> parallelConsumerContext) {
+		return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory);
+	}
+}
\ No newline at end of file
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
new file mode 100644
index 0000000000..b07a35cdc7
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.config;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.kafka.core.ParallelConsumerCallback;
+
+import io.confluent.parallelconsumer.ParallelConsumerOptions;
+import io.confluent.parallelconsumer.ParallelStreamProcessor;
+
+/**
+ * This class is for aggregating all related with ParallelConsumer.
+ * @author ...
+ * @since 3.2.0
+ */
+
+
+public class ParallelConsumerContext<K,V> {
+
+	public static final String DEFAULT_BEAN_NAME = "parallelConsumerContext";
+	private final ParallelConsumerConfig parallelConsumerConfig;
+	private final ParallelConsumerCallback<K,V> parallelConsumerCallback;
+	private ParallelStreamProcessor<K, V> processor;
+
+	public ParallelConsumerContext(ParallelConsumerCallback<K, V> callback) {
+		this.parallelConsumerConfig = new ParallelConsumerConfig();
+		this.parallelConsumerCallback = callback;
+	}
+
+
+	public ParallelConsumerCallback<K,V> parallelConsumerCallback() {
+		return this.parallelConsumerCallback;
+	}
+
+
+	public ParallelStreamProcessor<K, V> createConsumer(Consumer<K,V> consumer) {
+		final ParallelConsumerOptions<K, V> options = parallelConsumerConfig.toConsumerOptions(consumer);
+		this.processor = ParallelStreamProcessor.createEosStreamProcessor(options);
+		return this.processor;
+	}
+
+	public void stopParallelConsumer() {
+		this.processor.close();
+	}
+
+}
\ No newline at end of file
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java
new file mode 100644
index 0000000000..9cc32b013c
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.config;
+
+import org.springframework.context.annotation.ImportSelector;
+import org.springframework.core.type.AnnotationMetadata;
+import org.springframework.kafka.annotation.EnableParallelConsumer;
+/**
+ * ParallelConsumerImportSelector is to register {@link ParallelConsumerConfiguration}.
+ * If you want to import {@link ParallelConsumerConfiguration} to your application,
+ * you just annotated {@link EnableParallelConsumer} to your spring application.
+ * @author ...
+ * @since 3.2.0
+ */
+
+public class ParallelConsumerImportSelector implements ImportSelector {
+	@Override
+	public String[] selectImports(AnnotationMetadata importingClassMetadata) {
+		return new String[]{ParallelConsumerConfiguration.class.getName()};
+	}
+}
\ No newline at end of file
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java
new file mode 100644
index 0000000000..7bdf7ae06d
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.core;
+
+import java.util.List;
+
+import io.confluent.parallelconsumer.PollContext;
+import org.springframework.kafka.config.ParallelConsumerContext;
+
+/**
+ * User should create ConcreteClass of this and register it as Spring Bean.
+ * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext},
+ * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start.
+ * @author ...
+ * @since 3.2.0
+ */
+
+public interface ParallelConsumerCallback<K, V> {
+
+	/**
+	 * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}.
+	 * ParallelConsumer will process the consumed messages using this callback.
+	 * @param context context which Parallel Consumer produce
+	 * @return void.
+	 */
+	void accept(PollContext<K, V> context);
+	List<String> getTopics();
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
new file mode 100644
index 0000000000..ab70a0503e
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.core;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.kafka.config.ParallelConsumerContext;
+
+import io.confluent.parallelconsumer.ParallelStreamProcessor;
+
+/**
+ * ParallelConsumerFactory will be started and closed by Spring LifeCycle.
+ * This class is quite simple, because ParallelConsumer requires delegating the situation to itself.
+ * @author ...
+ * @since 3.2.0
+ */
+
+public class ParallelConsumerFactory<K, V> implements SmartLifecycle {
+
+	public static final String DEFAULT_BEAN_NAME = "parallelConsumerFactory";
+	private final ParallelConsumerContext<K, V> parallelConsumerContext;
+	private final DefaultKafkaConsumerFactory<K, V> defaultKafkaConsumerFactory;
+	private boolean running;
+
+	public ParallelConsumerFactory(ParallelConsumerContext<K, V> parallelConsumerContext,
+								   DefaultKafkaConsumerFactory<K, V> defaultKafkaConsumerFactory) {
+		this.parallelConsumerContext = parallelConsumerContext;
+		this.defaultKafkaConsumerFactory = defaultKafkaConsumerFactory;
+	}
+
+
+	@Override
+	public void start() {
+		final Consumer<K, V> consumer = defaultKafkaConsumerFactory.createConsumer();
+		final ParallelStreamProcessor<K, V> parallelConsumer = parallelConsumerContext.createConsumer(consumer);
+		parallelConsumer.subscribe(parallelConsumerContext.parallelConsumerCallback().getTopics());
+		parallelConsumer.poll(recordContexts -> parallelConsumerContext.parallelConsumerCallback().accept(recordContexts));
+		this.running = true;
+	}
+
+	@Override
+	public void stop() {
+		this.parallelConsumerContext.stopParallelConsumer();
+		this.running = false;
+	}
+
+	@Override
+	public boolean isRunning() {
+		return this.running;
+	}
+}
\ No newline at end of file

From 5be340db6d196494bb3498aeb33101f9dfb98331 Mon Sep 17 00:00:00 2001
From: chickenchickenlove <ojt90902@naver.com>
Date: Sat, 27 Apr 2024 22:40:50 +0900
Subject: [PATCH 2/5] refactoring

---
 .../kafka/config/ParallelConsumerConfig.java  |  23 +++-
 .../config/ParallelConsumerConfiguration.java |  19 ++-
 .../kafka/config/ParallelConsumerContext.java |  35 ++++--
 .../kafka/core/ParallelConsumerCallback.java  |  16 ++-
 .../kafka/core/ParallelConsumerFactory.java   | 115 ++++++++++++++++--
 .../kafka/core/PollAndProduceCallback.java    |  40 ++++++
 .../core/PollAndProduceManyCallback.java      |  40 ++++++
 .../PollAndProduceManyResultCallback.java     |  32 +++++
 .../core/PollAndProduceResultCallback.java    |  31 +++++
 .../kafka/core/PollCallback.java              |  45 +++++++
 .../kafka/core/ResultConsumerCallback.java    |   9 ++
 11 files changed, 369 insertions(+), 36 deletions(-)
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java

diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
index 6abc17a387..0ad99e6bb2 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
@@ -24,19 +24,22 @@
 
 import io.confluent.parallelconsumer.ParallelConsumerOptions;
 import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
+
+import org.apache.kafka.clients.producer.Producer;
 import org.springframework.util.StringUtils;
 import org.springframework.kafka.annotation.EnableParallelConsumer;
+// It would be better to be migrated to org.springframework.boot.autoconfigure.kafka.KafkaProperties.
 
 /**
  * ParallelConsumerConfig is for config of {@link io.confluent.parallelconsumer}.
  * This will be registered as Spring Bean when {@link EnableParallelConsumer} is annotated to your spring application.
  * @author ...
- * @since 3.2.0
+ * @since 3.3
  */
 
 public class ParallelConsumerConfig {
 
-
+	public static final String DEFAULT_BEAN_NAME = "parallelConsumerConfig";
 	private static final String PARALLEL_CONSUMER_MAX_CONCURRENCY = "PARALLEL_CONSUMER_MAX_CONCURRENCY";
 	private static final String PARALLEL_CONSUMER_ORDERING = "PARALLEL_CONSUMER_ORDERING";
 	private static final String ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT = "ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT";
@@ -67,9 +70,19 @@ private ProcessingOrder toOrder(String order) {
 		};
 	}
 
-	public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(Consumer<K, V> consumer) {
+	public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(
+			ParallelConsumerOptions.ParallelConsumerOptionsBuilder<K, V> builder,
+			Consumer<K, V> consumer,
+			Producer<K, V> producer) {
+
+		builder.producer(producer);
+		return toConsumerOptions(builder, consumer);
+	}
+
+	public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(
+			ParallelConsumerOptions.ParallelConsumerOptionsBuilder<K, V> builder,
+			Consumer<K, V> consumer) {
 
-		ParallelConsumerOptions.ParallelConsumerOptionsBuilder<K, V> builder = ParallelConsumerOptions.builder();
 		builder.consumer(consumer);
 
 		final String maxConcurrencyString = this.properties.get(PARALLEL_CONSUMER_MAX_CONCURRENCY);
@@ -105,4 +118,4 @@ public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(Consumer<K, V> cons
 
 		return builder.build();
 	}
-}
\ No newline at end of file
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
index a52bdb1de4..ebf1c011d4 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
@@ -16,8 +16,11 @@
 
 package org.springframework.kafka.config;
 
+import javax.annotation.Nullable;
+
 import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 import org.springframework.kafka.core.ParallelConsumerCallback;
 import org.springframework.kafka.core.ParallelConsumerFactory;
 import org.springframework.kafka.annotation.EnableParallelConsumer;
@@ -34,14 +37,22 @@
 
 public class ParallelConsumerConfiguration<K, V> {
 
+	@Bean(name = ParallelConsumerConfig.DEFAULT_BEAN_NAME)
+	public ParallelConsumerConfig parallelConsumerConfig() {
+		return new ParallelConsumerConfig();
+	}
+
 	@Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME)
-	public ParallelConsumerContext<K,V> parallelConsumerContext(ParallelConsumerCallback<K, V> parallelConsumerCallback) {
-		return new ParallelConsumerContext(parallelConsumerCallback);
+	public ParallelConsumerContext<K,V> parallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig,
+																ParallelConsumerCallback<K, V> parallelConsumerCallback) {
+		return new ParallelConsumerContext(parallelConsumerConfig,
+										   parallelConsumerCallback);
 	}
 
 	@Bean(name = ParallelConsumerFactory.DEFAULT_BEAN_NAME)
 	public ParallelConsumerFactory<K,V> parallelConsumerFactory(DefaultKafkaConsumerFactory<K,V> consumerFactory,
+																DefaultKafkaProducerFactory<K,V> producerFactory,
 																ParallelConsumerContext<K,V> parallelConsumerContext) {
-		return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory);
+		return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory, producerFactory);
 	}
-}
\ No newline at end of file
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
index b07a35cdc7..2c01b2786c 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
@@ -16,11 +16,17 @@
 
 package org.springframework.kafka.config;
 
+import java.time.Duration;
+
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
 import org.springframework.kafka.core.ParallelConsumerCallback;
 
+import io.confluent.parallelconsumer.JStreamParallelStreamProcessor;
 import io.confluent.parallelconsumer.ParallelConsumerOptions;
+import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder;
 import io.confluent.parallelconsumer.ParallelStreamProcessor;
+import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode;
 
 /**
  * This class is for aggregating all related with ParallelConsumer.
@@ -33,28 +39,31 @@ public class ParallelConsumerContext<K,V> {
 
 	public static final String DEFAULT_BEAN_NAME = "parallelConsumerContext";
 	private final ParallelConsumerConfig parallelConsumerConfig;
-	private final ParallelConsumerCallback<K,V> parallelConsumerCallback;
-	private ParallelStreamProcessor<K, V> processor;
+	private final ParallelConsumerCallback<K, V> parallelConsumerCallback;
 
-	public ParallelConsumerContext(ParallelConsumerCallback<K, V> callback) {
-		this.parallelConsumerConfig = new ParallelConsumerConfig();
+	public ParallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig,
+								   ParallelConsumerCallback<K, V> callback) {
+		this.parallelConsumerConfig = parallelConsumerConfig;
 		this.parallelConsumerCallback = callback;
 	}
 
-
-	public ParallelConsumerCallback<K,V> parallelConsumerCallback() {
+	public ParallelConsumerCallback<K, V> parallelConsumerCallback() {
 		return this.parallelConsumerCallback;
 	}
 
+	public ParallelConsumerOptions<K, V> getParallelConsumerOptions(Consumer<K, V> consumer) {
+		final ParallelConsumerOptionsBuilder<K, V> builder = ParallelConsumerOptions.builder();
+		return parallelConsumerConfig.toConsumerOptions(builder, consumer);
+	}
 
-	public ParallelStreamProcessor<K, V> createConsumer(Consumer<K,V> consumer) {
-		final ParallelConsumerOptions<K, V> options = parallelConsumerConfig.toConsumerOptions(consumer);
-		this.processor = ParallelStreamProcessor.createEosStreamProcessor(options);
-		return this.processor;
+	public ParallelConsumerOptions<K, V> getParallelConsumerOptions(Consumer<K, V> consumer, Producer<K, V> producer) {
+		final ParallelConsumerOptionsBuilder<K, V> builder = ParallelConsumerOptions.builder();
+		return parallelConsumerConfig.toConsumerOptions(builder, consumer, producer);
 	}
 
-	public void stopParallelConsumer() {
-		this.processor.close();
+	public void stop(ParallelStreamProcessor<K, V> parallelStreamProcessor) {
+		parallelStreamProcessor.close();
+
 	}
 
-}
\ No newline at end of file
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java
index 7bdf7ae06d..6c3072a15f 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java
@@ -17,8 +17,12 @@
 package org.springframework.kafka.core;
 
 import java.util.List;
+import java.util.regex.Pattern;
 
 import io.confluent.parallelconsumer.PollContext;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.springframework.kafka.config.ParallelConsumerContext;
 
 /**
@@ -32,11 +36,13 @@
 public interface ParallelConsumerCallback<K, V> {
 
 	/**
-	 * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}.
-	 * ParallelConsumer will process the consumed messages using this callback.
-	 * @param context context which Parallel Consumer produce
-	 * @return void.
+	 * ...
 	 */
-	void accept(PollContext<K, V> context);
 	List<String> getTopics();
+	default Pattern getSubscribeTopicsPattern(){
+		return null;
+	}
+	default ConsumerRebalanceListener getRebalanceListener(){
+		return null;
+	}
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
index ab70a0503e..4cea9234c9 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
@@ -16,45 +16,113 @@
 
 package org.springframework.kafka.core;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.springframework.context.SmartLifecycle;
 import org.springframework.kafka.config.ParallelConsumerContext;
 
+import io.confluent.parallelconsumer.ParallelConsumer;
+import io.confluent.parallelconsumer.ParallelConsumerOptions;
 import io.confluent.parallelconsumer.ParallelStreamProcessor;
+import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
 
 /**
  * ParallelConsumerFactory will be started and closed by Spring LifeCycle.
  * This class is quite simple, because ParallelConsumer requires delegating the situation to itself.
- * @author ...
+ * @author Sanghyeok An
  * @since 3.2.0
  */
 
 public class ParallelConsumerFactory<K, V> implements SmartLifecycle {
 
 	public static final String DEFAULT_BEAN_NAME = "parallelConsumerFactory";
-	private final ParallelConsumerContext<K, V> parallelConsumerContext;
+
 	private final DefaultKafkaConsumerFactory<K, V> defaultKafkaConsumerFactory;
+	private final DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory;
+	private final ParallelConsumerContext<K, V> parallelConsumerContext;
+
+	private final ParallelStreamProcessor<K, V> parallelConsumer;
+	private final ParallelConsumerOptions<K, V> parallelConsumerOptions;
 	private boolean running;
 
 	public ParallelConsumerFactory(ParallelConsumerContext<K, V> parallelConsumerContext,
-								   DefaultKafkaConsumerFactory<K, V> defaultKafkaConsumerFactory) {
+								   DefaultKafkaConsumerFactory<K, V> defaultKafkaConsumerFactory,
+								   DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory) {
 		this.parallelConsumerContext = parallelConsumerContext;
 		this.defaultKafkaConsumerFactory = defaultKafkaConsumerFactory;
+		this.defaultKafkaProducerFactory = defaultKafkaProducerFactory;
+
+		final Consumer<K, V> kafkaConsumer = defaultKafkaConsumerFactory.createConsumer();
+		final Producer<K, V> kafkaProducer = defaultKafkaProducerFactory.createProducer();
+		this.parallelConsumerOptions = parallelConsumerOptions(kafkaConsumer, kafkaProducer);
+		this.parallelConsumer = ParallelStreamProcessor.createEosStreamProcessor(this.parallelConsumerOptions);
 	}
 
 
+	private ParallelConsumerOptions<K, V> parallelConsumerOptions(Consumer<K, V> consumer,
+																  Producer<K, V> producer) {
+		final ParallelConsumerCallback<K, V> callback = parallelConsumerContext.parallelConsumerCallback();
+		if (callback instanceof PollAndProduceManyCallback<K,V> ||
+			callback instanceof PollAndProduceCallback<K,V>) {
+			return parallelConsumerContext.getParallelConsumerOptions(consumer, producer);
+		} else {
+			return parallelConsumerContext.getParallelConsumerOptions(consumer);
+		}
+	}
+
 	@Override
 	public void start() {
-		final Consumer<K, V> consumer = defaultKafkaConsumerFactory.createConsumer();
-		final ParallelStreamProcessor<K, V> parallelConsumer = parallelConsumerContext.createConsumer(consumer);
-		parallelConsumer.subscribe(parallelConsumerContext.parallelConsumerCallback().getTopics());
-		parallelConsumer.poll(recordContexts -> parallelConsumerContext.parallelConsumerCallback().accept(recordContexts));
+		subscribe();
+
+		final ParallelConsumerCallback<K, V> callback0 = parallelConsumerContext.parallelConsumerCallback();
+
+		if (callback0 instanceof ResultConsumerCallback) {
+			if (callback0 instanceof PollAndProduceManyResultCallback<K, V>) {
+				final PollAndProduceManyResultCallback<K, V> callback =
+						(PollAndProduceManyResultCallback<K, V>) callback0;
+
+				this.parallelConsumer.pollAndProduceMany(callback::accept, callback::resultConsumer);
+			} else if (callback0 instanceof PollAndProduceCallback<K, V>) {
+				final PollAndProduceResultCallback<K, V> callback =
+						(PollAndProduceResultCallback<K, V>) callback0;
+
+				this.parallelConsumer.pollAndProduce(callback::accept, callback::resultConsumer);
+			} else {
+				throw new UnsupportedOperationException();
+			}
+		} else {
+			if (callback0 instanceof PollAndProduceManyCallback<K, V>) {
+				final PollAndProduceManyCallback<K, V> callback =
+						(PollAndProduceManyCallback<K, V>) callback0;
+
+				this.parallelConsumer.pollAndProduceMany(callback::accept);
+			} else if (callback0 instanceof PollAndProduceCallback<K, V>) {
+				final PollAndProduceCallback<K, V> callback =
+						(PollAndProduceCallback<K, V>) callback0;
+
+				this.parallelConsumer.pollAndProduce(callback::accept);
+			} else if (callback0 instanceof PollCallback<K, V>) {
+				final PollCallback<K, V> callback = (PollCallback<K, V>) callback0;
+
+				this.parallelConsumer.poll(callback::accept);
+			} else {
+				throw new UnsupportedOperationException();
+			}
+		}
 		this.running = true;
 	}
 
 	@Override
 	public void stop() {
-		this.parallelConsumerContext.stopParallelConsumer();
+		this.parallelConsumerContext.stop(this.parallelConsumer);
 		this.running = false;
 	}
 
@@ -62,4 +130,33 @@ public void stop() {
 	public boolean isRunning() {
 		return this.running;
 	}
-}
\ No newline at end of file
+
+	private void subscribe() {
+		final ParallelConsumerCallback<K, V> callback = this.parallelConsumerContext.parallelConsumerCallback();
+
+		final List<String> topics = callback.getTopics();
+		final ConsumerRebalanceListener rebalanceListener = callback.getRebalanceListener();
+
+		if (topics != null && !topics.isEmpty()) {
+			subscribe(topics, rebalanceListener);
+		} else {
+			subscribe(callback.getSubscribeTopicsPattern(), rebalanceListener);
+		}
+	}
+
+	private void subscribe(Collection<String> topics, ConsumerRebalanceListener callback){
+		if (callback == null) {
+			this.parallelConsumer.subscribe(topics);
+		} else {
+			this.parallelConsumer.subscribe(topics, callback);
+		}
+	}
+
+	private void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+		if (callback == null) {
+			this.parallelConsumer.subscribe(pattern);
+		} else {
+			this.parallelConsumer.subscribe(pattern, callback);
+		}
+	}
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java
new file mode 100644
index 0000000000..3d6d27f253
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.core;
+
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.kafka.config.ParallelConsumerContext;
+
+import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
+import io.confluent.parallelconsumer.PollContext;
+
+/**
+ * ...
+ */
+
+public interface PollAndProduceCallback<K, V> extends ParallelConsumerCallback<K, V> {
+
+	/**
+	 * ...
+	 */
+	ProducerRecord<K, V> accept(PollContext<K, V> context);
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java
new file mode 100644
index 0000000000..8868f9ffe5
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.core;
+
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.kafka.config.ParallelConsumerContext;
+
+import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
+import io.confluent.parallelconsumer.PollContext;
+
+/**
+ * ...
+ */
+
+public interface PollAndProduceManyCallback<K, V> extends ParallelConsumerCallback<K, V> {
+
+	/**
+	 * ...
+	 */
+	List<ProducerRecord<K, V>> accept(PollContext<K, V> context);
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java
new file mode 100644
index 0000000000..885991cf3c
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.core;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
+import io.confluent.parallelconsumer.PollContext;
+
+/**
+ * ...
+ */
+
+public interface PollAndProduceManyResultCallback<K, V> extends PollAndProduceManyCallback<K, V>, ResultConsumerCallback<K, V> {
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java
new file mode 100644
index 0000000000..57f1f68732
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.core;
+
+import java.util.function.Consumer;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
+import io.confluent.parallelconsumer.PollContext;
+
+/**
+ * ...
+ */
+
+public interface PollAndProduceResultCallback<K, V> extends PollAndProduceCallback<K, V>, ResultConsumerCallback<K, V> {
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java
new file mode 100644
index 0000000000..5b7e765dc9
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.core;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.kafka.config.ParallelConsumerContext;
+
+import io.confluent.parallelconsumer.PollContext;
+
+/**
+ * User should create ConcreteClass of this and register it as Spring Bean.
+ * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext},
+ * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start.
+ * @author ...
+ * @since 3.2.0
+ */
+
+public interface PollCallback<K, V> extends ParallelConsumerCallback<K, V> {
+
+	/**
+	 * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}.
+	 * ParallelConsumer will process the consumed messages using this callback.
+	 * @param context context which Parallel Consumer produce
+	 * @return void.
+	 */
+	void accept(PollContext<K, V> context);
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java
new file mode 100644
index 0000000000..07a32cf437
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java
@@ -0,0 +1,9 @@
+package org.springframework.kafka.core;
+
+import java.util.function.Consumer;
+
+import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
+
+public interface ResultConsumerCallback<K, V> {
+	Consumer<ConsumeProduceResult<K, V, K, V>> resultConsumer(ConsumeProduceResult<K, V, K, V> result);
+}

From 4eca9795342bc482e39214c1271a718ac24611a6 Mon Sep 17 00:00:00 2001
From: chickenchickenlove <ojt90902@naver.com>
Date: Sun, 28 Apr 2024 08:11:52 +0900
Subject: [PATCH 3/5] refactor

---
 ...allelConsumerOptionsProviderCondition.java |  14 ++
 .../kafka/config/ParallelConsumerConfig.java  | 152 ++++++++++------
 .../config/ParallelConsumerConfiguration.java |  25 ++-
 .../kafka/config/ParallelConsumerContext.java |  13 +-
 .../kafka/core/ParallelConsumerFactory.java   |  86 +++++----
 .../ParallelConsumerCallback.java             |  22 ++-
 .../ParallelConsumerOptionsProvider.java      | 168 ++++++++++++++++++
 .../Poll.java}                                |  16 +-
 .../PollAndProduce.java}                      |  16 +-
 .../PollAndProduceMany.java}                  |  15 +-
 .../PollAndProduceManyResult.java}            |  23 +--
 .../PollAndProduceResult.java}                |  23 +--
 12 files changed, 406 insertions(+), 167 deletions(-)
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java
 rename spring-kafka/src/main/java/org/springframework/kafka/core/{ => parallelconsumer}/ParallelConsumerCallback.java (76%)
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java
 rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollCallback.java => parallelconsumer/Poll.java} (80%)
 rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceResultCallback.java => parallelconsumer/PollAndProduce.java} (72%)
 rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceManyResultCallback.java => parallelconsumer/PollAndProduceMany.java} (73%)
 rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceCallback.java => parallelconsumer/PollAndProduceManyResult.java} (51%)
 rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceManyCallback.java => parallelconsumer/PollAndProduceResult.java} (50%)

diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java
new file mode 100644
index 0000000000..a383383755
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java
@@ -0,0 +1,14 @@
+package org.springframework.kafka.config;
+
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider;
+
+public class OnMissingParallelConsumerOptionsProviderCondition implements Condition {
+
+	@Override
+	public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
+		return context.getBeanFactory().getBean(ParallelConsumerOptionsProvider.class) == null;
+	}
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
index 0ad99e6bb2..8cb5557f35 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
@@ -16,18 +16,15 @@
 
 package org.springframework.kafka.config;
 
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.kafka.clients.consumer.Consumer;
 
 import io.confluent.parallelconsumer.ParallelConsumerOptions;
-import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
 
 import org.apache.kafka.clients.producer.Producer;
-import org.springframework.util.StringUtils;
+import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider;
 import org.springframework.kafka.annotation.EnableParallelConsumer;
+
 // It would be better to be migrated to org.springframework.boot.autoconfigure.kafka.KafkaProperties.
 
 /**
@@ -37,40 +34,16 @@
  * @since 3.3
  */
 
-public class ParallelConsumerConfig {
+public class ParallelConsumerConfig<K, V> {
 
 	public static final String DEFAULT_BEAN_NAME = "parallelConsumerConfig";
-	private static final String PARALLEL_CONSUMER_MAX_CONCURRENCY = "PARALLEL_CONSUMER_MAX_CONCURRENCY";
-	private static final String PARALLEL_CONSUMER_ORDERING = "PARALLEL_CONSUMER_ORDERING";
-	private static final String ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT = "ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT";
-	private static final String COMMIT_LOCK_ACQUISITION_TIMEOUT = "COMMIT_LOCK_ACQUISITION_TIMEOUT";
-	private static final String COMMIT_INTERVAL = "COMMIT_INTERVAL";
-	private final Map<String, String> properties = new HashMap<>();
-
-	public ParallelConsumerConfig() {
-
-		final String maxConcurrency = System.getenv(PARALLEL_CONSUMER_MAX_CONCURRENCY);
-		final String ordering = System.getenv(PARALLEL_CONSUMER_ORDERING);
-		final String allowEagerProcessingDuringTransactionCommit = System.getenv(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT);
-		final String commitLockAcquisitionTimeout = System.getenv(COMMIT_LOCK_ACQUISITION_TIMEOUT);
-		final String commitInterval = System.getenv(COMMIT_INTERVAL);
-
-		this.properties.put(PARALLEL_CONSUMER_MAX_CONCURRENCY, maxConcurrency);
-		this.properties.put(PARALLEL_CONSUMER_ORDERING, ordering);
-		this.properties.put(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT, allowEagerProcessingDuringTransactionCommit);
-		this.properties.put(COMMIT_LOCK_ACQUISITION_TIMEOUT, commitLockAcquisitionTimeout);
-		this.properties.put(COMMIT_INTERVAL, commitInterval);
-	}
+	private final ParallelConsumerOptionsProvider<K, V> provider;
 
-	private ProcessingOrder toOrder(String order) {
-		return switch (order) {
-			case "partition" -> ProcessingOrder.PARTITION;
-			case "unordered" -> ProcessingOrder.UNORDERED;
-			default -> ProcessingOrder.KEY; // Confluent Consumer Default Policy
-		};
+	public ParallelConsumerConfig(ParallelConsumerOptionsProvider<K, V> provider) {
+		this.provider = provider;
 	}
 
-	public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(
+	public ParallelConsumerOptions<K, V> toConsumerOptions(
 			ParallelConsumerOptions.ParallelConsumerOptionsBuilder<K, V> builder,
 			Consumer<K, V> consumer,
 			Producer<K, V> producer) {
@@ -79,43 +52,110 @@ public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(
 		return toConsumerOptions(builder, consumer);
 	}
 
-	public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(
+	public ParallelConsumerOptions<K, V> toConsumerOptions(
 			ParallelConsumerOptions.ParallelConsumerOptionsBuilder<K, V> builder,
 			Consumer<K, V> consumer) {
-
 		builder.consumer(consumer);
+		return buildRemainOptions(builder);
+	}
+
+	private ParallelConsumerOptions<K, V> buildRemainOptions(ParallelConsumerOptions.ParallelConsumerOptionsBuilder<K, V> builder) {
+		if (this.provider.managedExecutorService() != null){
+			builder.managedExecutorService(this.provider.managedExecutorService());
+		}
 
-		final String maxConcurrencyString = this.properties.get(PARALLEL_CONSUMER_MAX_CONCURRENCY);
-		final String orderingString = this.properties.get(PARALLEL_CONSUMER_ORDERING);
-		final String allowEagerProcessingDuringTransactionCommitString = this.properties.get(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT);
-		final String commitLockAcquisitionTimeoutString = this.properties.get(COMMIT_LOCK_ACQUISITION_TIMEOUT);
-		final String commitIntervalString = this.properties.get(COMMIT_INTERVAL);
+		if (this.provider.managedThreadFactory() != null){
+			builder.managedThreadFactory(this.provider.managedThreadFactory());
+		}
+
+		if (this.provider.meterRegistry() != null){
+			builder.meterRegistry(this.provider.meterRegistry());
+		}
+
+		if (this.provider.pcInstanceTag() != null){
+			builder.pcInstanceTag(this.provider.pcInstanceTag());
+		}
+
+		if (this.provider.metricsTags() != null){
+			builder.metricsTags(this.provider.metricsTags());
+		}
+
+		if (this.provider.allowEagerProcessingDuringTransactionCommit() != null){
+			builder.allowEagerProcessingDuringTransactionCommit(this.provider.allowEagerProcessingDuringTransactionCommit());
+		}
+
+		if (this.provider.commitLockAcquisitionTimeout() != null){
+			builder.commitLockAcquisitionTimeout(this.provider.commitLockAcquisitionTimeout());
+		}
+
+		if (this.provider.produceLockAcquisitionTimeout() != null){
+			builder.produceLockAcquisitionTimeout(this.provider.produceLockAcquisitionTimeout());
+		}
+
+		if (this.provider.commitInterval() != null){
+			builder.commitInterval(this.provider.commitInterval());
+		}
 
-		if (StringUtils.hasText(maxConcurrencyString)) {
-			final Integer maxConcurrency = Integer.valueOf(maxConcurrencyString);
-			builder.maxConcurrency(maxConcurrency);
+		if (this.provider.ordering() != null){
+			builder.ordering(this.provider.ordering());
 		}
 
-		if (StringUtils.hasText(orderingString)) {
-			final ProcessingOrder processingOrder = toOrder(orderingString);
-			builder.ordering(processingOrder);
+		if (this.provider.commitMode() != null){
+			builder.commitMode(this.provider.commitMode());
 		}
 
-		if (StringUtils.hasText(allowEagerProcessingDuringTransactionCommitString)) {
-			final Boolean allowEagerProcessingDuringTransactionCommit = Boolean.valueOf(allowEagerProcessingDuringTransactionCommitString);
-			builder.allowEagerProcessingDuringTransactionCommit(allowEagerProcessingDuringTransactionCommit);
+		if (this.provider.maxConcurrency() != null){
+			builder.maxConcurrency(this.provider.maxConcurrency());
 		}
 
-		if (StringUtils.hasText(commitLockAcquisitionTimeoutString)) {
-			final Long commitLockAcquisitionTimeout = Long.valueOf(commitLockAcquisitionTimeoutString);
-			builder.commitLockAcquisitionTimeout(Duration.ofSeconds(commitLockAcquisitionTimeout));
+		if (this.provider.invalidOffsetMetadataPolicy() != null){
+			builder.invalidOffsetMetadataPolicy(this.provider.invalidOffsetMetadataPolicy());
 		}
 
-		if (StringUtils.hasText(commitIntervalString)) {
-			final Long commitInterval = Long.valueOf(commitIntervalString);
-			builder.commitInterval(Duration.ofMillis(commitInterval));
+		if (this.provider.retryDelayProvider() != null){
+			builder.retryDelayProvider(this.provider.retryDelayProvider());
+		}
+
+		if (this.provider.sendTimeout() != null){
+			builder.sendTimeout(this.provider.sendTimeout());
+		}
+
+		if (this.provider.offsetCommitTimeout() != null){
+			builder.offsetCommitTimeout(this.provider.offsetCommitTimeout());
+		}
+
+		if (this.provider.batchSize() != null){
+			builder.batchSize(this.provider.batchSize());
+		}
+
+		if (this.provider.thresholdForTimeSpendInQueueWarning() != null){
+			builder.thresholdForTimeSpendInQueueWarning(this.provider.thresholdForTimeSpendInQueueWarning());
+		}
+
+		if (this.provider.maxFailureHistory() != null){
+			builder.maxFailureHistory(this.provider.maxFailureHistory());
+		}
+
+		if (this.provider.shutdownTimeout() != null){
+			builder.shutdownTimeout(this.provider.shutdownTimeout());
+		}
+
+		if (this.provider.drainTimeout() != null){
+			builder.drainTimeout(this.provider.drainTimeout());
+		}
+
+		if (this.provider.messageBufferSize() != null){
+			builder.messageBufferSize(this.provider.messageBufferSize());
+		}
+
+		if (this.provider.initialLoadFactor() != null){
+			builder.initialLoadFactor(this.provider.initialLoadFactor());
+		}
+		if (this.provider.maximumLoadFactor() != null){
+			builder.maximumLoadFactor(this.provider.maximumLoadFactor());
 		}
 
 		return builder.build();
 	}
+
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
index ebf1c011d4..da5e556826 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
@@ -16,14 +16,14 @@
 
 package org.springframework.kafka.config;
 
-import javax.annotation.Nullable;
-
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.ParallelConsumerCallback;
+import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback;
 import org.springframework.kafka.core.ParallelConsumerFactory;
 import org.springframework.kafka.annotation.EnableParallelConsumer;
+import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider;
 
 /**
  * If User decide to use parallelConsumer on SpringKafka, User should import this class to their ComponentScan scopes.
@@ -37,22 +37,29 @@
 
 public class ParallelConsumerConfiguration<K, V> {
 
+	@Bean
+	@Conditional(OnMissingParallelConsumerOptionsProviderCondition.class)
+	public ParallelConsumerOptionsProvider<K, V> parallelConsumerOptionsProvider() {
+		return new ParallelConsumerOptionsProvider<K, V>() {};
+	}
+
 	@Bean(name = ParallelConsumerConfig.DEFAULT_BEAN_NAME)
-	public ParallelConsumerConfig parallelConsumerConfig() {
-		return new ParallelConsumerConfig();
+	public ParallelConsumerConfig<K, V> parallelConsumerConfig(ParallelConsumerOptionsProvider<K, V> parallelConsumerOptionsProvider) {
+		return new ParallelConsumerConfig<K, V>(parallelConsumerOptionsProvider);
 	}
 
 	@Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME)
-	public ParallelConsumerContext<K,V> parallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig,
+	public ParallelConsumerContext<K,V> parallelConsumerContext(ParallelConsumerConfig<K, V> parallelConsumerConfig,
 																ParallelConsumerCallback<K, V> parallelConsumerCallback) {
-		return new ParallelConsumerContext(parallelConsumerConfig,
-										   parallelConsumerCallback);
+		return new ParallelConsumerContext<K, V>(parallelConsumerConfig,
+												 parallelConsumerCallback);
 	}
 
 	@Bean(name = ParallelConsumerFactory.DEFAULT_BEAN_NAME)
 	public ParallelConsumerFactory<K,V> parallelConsumerFactory(DefaultKafkaConsumerFactory<K,V> consumerFactory,
 																DefaultKafkaProducerFactory<K,V> producerFactory,
 																ParallelConsumerContext<K,V> parallelConsumerContext) {
-		return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory, producerFactory);
+		return new ParallelConsumerFactory<K, V>(parallelConsumerContext, consumerFactory, producerFactory);
 	}
+
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
index 2c01b2786c..4bb2579a13 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
@@ -16,17 +16,13 @@
 
 package org.springframework.kafka.config;
 
-import java.time.Duration;
-
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
-import org.springframework.kafka.core.ParallelConsumerCallback;
+import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback;
 
-import io.confluent.parallelconsumer.JStreamParallelStreamProcessor;
 import io.confluent.parallelconsumer.ParallelConsumerOptions;
 import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder;
 import io.confluent.parallelconsumer.ParallelStreamProcessor;
-import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode;
 
 /**
  * This class is for aggregating all related with ParallelConsumer.
@@ -41,7 +37,7 @@ public class ParallelConsumerContext<K,V> {
 	private final ParallelConsumerConfig parallelConsumerConfig;
 	private final ParallelConsumerCallback<K, V> parallelConsumerCallback;
 
-	public ParallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig,
+	public ParallelConsumerContext(ParallelConsumerConfig<K, V> parallelConsumerConfig,
 								   ParallelConsumerCallback<K, V> callback) {
 		this.parallelConsumerConfig = parallelConsumerConfig;
 		this.parallelConsumerCallback = callback;
@@ -61,9 +57,4 @@ public ParallelConsumerOptions<K, V> getParallelConsumerOptions(Consumer<K, V> c
 		return parallelConsumerConfig.toConsumerOptions(builder, consumer, producer);
 	}
 
-	public void stop(ParallelStreamProcessor<K, V> parallelStreamProcessor) {
-		parallelStreamProcessor.close();
-
-	}
-
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
index 4cea9234c9..43b77c1df1 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
@@ -16,29 +16,32 @@
 
 package org.springframework.kafka.core;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
-import java.util.Objects;
 import java.util.regex.Pattern;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.springframework.context.SmartLifecycle;
 import org.springframework.kafka.config.ParallelConsumerContext;
+import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback;
+import org.springframework.kafka.core.parallelconsumer.PollAndProduce;
+import org.springframework.kafka.core.parallelconsumer.PollAndProduceMany;
+import org.springframework.kafka.core.parallelconsumer.PollAndProduceManyResult;
+import org.springframework.kafka.core.parallelconsumer.PollAndProduceResult;
+import org.springframework.kafka.core.parallelconsumer.Poll;
 
-import io.confluent.parallelconsumer.ParallelConsumer;
 import io.confluent.parallelconsumer.ParallelConsumerOptions;
 import io.confluent.parallelconsumer.ParallelStreamProcessor;
-import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
+import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode;
 
 /**
  * ParallelConsumerFactory will be started and closed by Spring LifeCycle.
  * This class is quite simple, because ParallelConsumer requires delegating the situation to itself.
  * @author Sanghyeok An
- * @since 3.2.0
+ * @since 3.3
  */
 
 public class ParallelConsumerFactory<K, V> implements SmartLifecycle {
@@ -48,7 +51,6 @@ public class ParallelConsumerFactory<K, V> implements SmartLifecycle {
 	private final DefaultKafkaConsumerFactory<K, V> defaultKafkaConsumerFactory;
 	private final DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory;
 	private final ParallelConsumerContext<K, V> parallelConsumerContext;
-
 	private final ParallelStreamProcessor<K, V> parallelConsumer;
 	private final ParallelConsumerOptions<K, V> parallelConsumerOptions;
 	private boolean running;
@@ -70,8 +72,8 @@ public ParallelConsumerFactory(ParallelConsumerContext<K, V> parallelConsumerCon
 	private ParallelConsumerOptions<K, V> parallelConsumerOptions(Consumer<K, V> consumer,
 																  Producer<K, V> producer) {
 		final ParallelConsumerCallback<K, V> callback = parallelConsumerContext.parallelConsumerCallback();
-		if (callback instanceof PollAndProduceManyCallback<K,V> ||
-			callback instanceof PollAndProduceCallback<K,V>) {
+		if (callback instanceof PollAndProduceMany<K,V> ||
+			callback instanceof PollAndProduce<K,V>) {
 			return parallelConsumerContext.getParallelConsumerOptions(consumer, producer);
 		} else {
 			return parallelConsumerContext.getParallelConsumerOptions(consumer);
@@ -85,35 +87,40 @@ public void start() {
 		final ParallelConsumerCallback<K, V> callback0 = parallelConsumerContext.parallelConsumerCallback();
 
 		if (callback0 instanceof ResultConsumerCallback) {
-			if (callback0 instanceof PollAndProduceManyResultCallback<K, V>) {
-				final PollAndProduceManyResultCallback<K, V> callback =
-						(PollAndProduceManyResultCallback<K, V>) callback0;
+			if (callback0 instanceof PollAndProduceManyResult<K, V>) {
+				final PollAndProduceManyResult<K, V> callback =
+						(PollAndProduceManyResult<K, V>) callback0;
 
 				this.parallelConsumer.pollAndProduceMany(callback::accept, callback::resultConsumer);
-			} else if (callback0 instanceof PollAndProduceCallback<K, V>) {
-				final PollAndProduceResultCallback<K, V> callback =
-						(PollAndProduceResultCallback<K, V>) callback0;
+			}
+			else if (callback0 instanceof PollAndProduce<K, V>) {
+				final PollAndProduceResult<K, V> callback =
+						(PollAndProduceResult<K, V>) callback0;
 
 				this.parallelConsumer.pollAndProduce(callback::accept, callback::resultConsumer);
-			} else {
+			}
+			else {
 				throw new UnsupportedOperationException();
 			}
 		} else {
-			if (callback0 instanceof PollAndProduceManyCallback<K, V>) {
-				final PollAndProduceManyCallback<K, V> callback =
-						(PollAndProduceManyCallback<K, V>) callback0;
+			if (callback0 instanceof PollAndProduceMany<K, V>) {
+				final PollAndProduceMany<K, V> callback =
+						(PollAndProduceMany<K, V>) callback0;
 
 				this.parallelConsumer.pollAndProduceMany(callback::accept);
-			} else if (callback0 instanceof PollAndProduceCallback<K, V>) {
-				final PollAndProduceCallback<K, V> callback =
-						(PollAndProduceCallback<K, V>) callback0;
+			}
+			else if (callback0 instanceof PollAndProduce<K, V>) {
+				final PollAndProduce<K, V> callback =
+						(PollAndProduce<K, V>) callback0;
 
 				this.parallelConsumer.pollAndProduce(callback::accept);
-			} else if (callback0 instanceof PollCallback<K, V>) {
-				final PollCallback<K, V> callback = (PollCallback<K, V>) callback0;
+			}
+			else if (callback0 instanceof Poll<K, V>) {
+				final Poll<K, V> callback = (Poll<K, V>) callback0;
 
 				this.parallelConsumer.poll(callback::accept);
-			} else {
+			}
+			else {
 				throw new UnsupportedOperationException();
 			}
 		}
@@ -122,7 +129,12 @@ public void start() {
 
 	@Override
 	public void stop() {
-		this.parallelConsumerContext.stop(this.parallelConsumer);
+		final ParallelConsumerCallback<K, V> callback =
+				this.parallelConsumerContext.parallelConsumerCallback();
+		final DrainingMode drainingMode = callback.drainingMode();
+		final Duration duration = callback.drainTimeOut();
+
+		this.parallelConsumer.close(duration, drainingMode);
 		this.running = false;
 	}
 
@@ -139,24 +151,28 @@ private void subscribe() {
 
 		if (topics != null && !topics.isEmpty()) {
 			subscribe(topics, rebalanceListener);
-		} else {
+		}
+		else {
 			subscribe(callback.getSubscribeTopicsPattern(), rebalanceListener);
 		}
 	}
 
-	private void subscribe(Collection<String> topics, ConsumerRebalanceListener callback){
-		if (callback == null) {
+	private void subscribe(Collection<String> topics, ConsumerRebalanceListener listenerCallback){
+		if (listenerCallback == null) {
 			this.parallelConsumer.subscribe(topics);
-		} else {
-			this.parallelConsumer.subscribe(topics, callback);
+		}
+		else {
+			this.parallelConsumer.subscribe(topics, listenerCallback);
 		}
 	}
 
-	private void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
-		if (callback == null) {
+	private void subscribe(Pattern pattern, ConsumerRebalanceListener listenerCallback) {
+		if (listenerCallback == null) {
 			this.parallelConsumer.subscribe(pattern);
-		} else {
-			this.parallelConsumer.subscribe(pattern, callback);
+		}
+		else {
+			this.parallelConsumer.subscribe(pattern, listenerCallback);
 		}
 	}
+
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java
similarity index 76%
rename from spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java
rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java
index 6c3072a15f..29a995580f 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java
@@ -14,23 +14,25 @@
  * limitations under the License.
  */
 
-package org.springframework.kafka.core;
+package org.springframework.kafka.core.parallelconsumer;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.regex.Pattern;
 
-import io.confluent.parallelconsumer.PollContext;
-
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.springframework.kafka.config.ParallelConsumerContext;
+import org.springframework.kafka.core.ParallelConsumerFactory;
+
+import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode;
 
 /**
  * User should create ConcreteClass of this and register it as Spring Bean.
  * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext},
  * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start.
- * @author ...
- * @since 3.2.0
+ *
+ * @author Sanghyeok An
+ * @since 3.3
  */
 
 public interface ParallelConsumerCallback<K, V> {
@@ -45,4 +47,12 @@ default Pattern getSubscribeTopicsPattern(){
 	default ConsumerRebalanceListener getRebalanceListener(){
 		return null;
 	}
+	default DrainingMode drainingMode() {
+		return DrainingMode.DONT_DRAIN;
+	}
+
+	default Duration drainTimeOut() {
+		return Duration.ofMillis(0);
+	}
+
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java
new file mode 100644
index 0000000000..09bfb13029
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.core.parallelconsumer;
+
+import java.time.Duration;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import org.springframework.kafka.config.ParallelConsumerContext;
+import org.springframework.kafka.core.ParallelConsumerFactory;
+
+import io.confluent.parallelconsumer.ParallelConsumerOptions;
+import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
+import io.confluent.parallelconsumer.ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy;
+import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder;
+import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
+import io.confluent.parallelconsumer.RecordContext;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tag;
+
+/**
+ *
+ * @author Sanghyeok An
+ * @since 3.3
+ */
+
+public interface ParallelConsumerOptionsProvider<K, V> {
+
+	default void hello() {
+		ParallelConsumerOptionsBuilder<Object, Object> builder = ParallelConsumerOptions.builder();
+	}
+
+	@Nullable
+	default String managedExecutorService() {
+		return null;
+	}
+
+	@Nullable
+	default String managedThreadFactory() {
+		return null;
+	}
+
+	@Nullable
+	default MeterRegistry meterRegistry() {
+		return null;
+	}
+
+	@Nullable
+	default String pcInstanceTag() {
+		return null;
+	}
+
+	@Nullable
+	default Iterable<Tag> metricsTags() {
+		return null;
+	}
+
+	@Nullable
+	default Boolean allowEagerProcessingDuringTransactionCommit() {
+		return null;
+	}
+
+	@Nullable
+	default Duration commitLockAcquisitionTimeout() {
+		return null;
+	}
+
+	@Nullable
+	default Duration produceLockAcquisitionTimeout() {
+		return null;
+	}
+
+	@Nullable
+	default Duration commitInterval() {
+		return null;
+	}
+
+	@Nullable
+	default ProcessingOrder ordering() {
+		return null;
+	}
+
+	@Nullable
+	default CommitMode commitMode() {
+		return null;
+	}
+
+	@Nullable
+	default Integer maxConcurrency() {
+		return null;
+	}
+
+	@Nullable
+	default InvalidOffsetMetadataHandlingPolicy invalidOffsetMetadataPolicy() {
+		return null;
+	}
+
+	@Nullable
+	default Function<RecordContext<K, V>, Duration> retryDelayProvider() {
+		return null;
+	}
+
+	@Nullable
+	default Duration sendTimeout() {
+		return null;
+	}
+
+	@Nullable
+	default Duration offsetCommitTimeout() {
+		return null;
+	}
+
+	@Nullable
+	default Integer batchSize() {
+		return null;
+	}
+
+	@Nullable
+	default Duration thresholdForTimeSpendInQueueWarning () {
+		return null;
+	}
+
+	@Nullable
+	default Integer maxFailureHistory() {
+		return null;
+	}
+
+	@Nullable
+	default Duration shutdownTimeout() {
+		return null;
+	}
+
+	@Nullable
+	default Duration drainTimeout() {
+		return null;
+	}
+
+	@Nullable
+	default Integer messageBufferSize() {
+		return null;
+	}
+
+	@Nullable
+	default Integer initialLoadFactor() {
+		return null;
+	}
+
+	@Nullable
+	default Integer maximumLoadFactor() {
+		return null;
+	}
+
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java
similarity index 80%
rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java
rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java
index 5b7e765dc9..bfa3f7dbd3 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java
@@ -14,14 +14,10 @@
  * limitations under the License.
  */
 
-package org.springframework.kafka.core;
+package org.springframework.kafka.core.parallelconsumer;
 
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.springframework.kafka.config.ParallelConsumerContext;
+import org.springframework.kafka.core.ParallelConsumerFactory;
 
 import io.confluent.parallelconsumer.PollContext;
 
@@ -29,11 +25,12 @@
  * User should create ConcreteClass of this and register it as Spring Bean.
  * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext},
  * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start.
- * @author ...
- * @since 3.2.0
+ *
+ * @author Sanghyeok An
+ * @since 3.3
  */
 
-public interface PollCallback<K, V> extends ParallelConsumerCallback<K, V> {
+public interface Poll<K, V> extends ParallelConsumerCallback<K, V> {
 
 	/**
 	 * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}.
@@ -42,4 +39,5 @@ public interface PollCallback<K, V> extends ParallelConsumerCallback<K, V> {
 	 * @return void.
 	 */
 	void accept(PollContext<K, V> context);
+
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java
similarity index 72%
rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java
rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java
index 57f1f68732..1b00ed251d 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java
@@ -14,18 +14,24 @@
  * limitations under the License.
  */
 
-package org.springframework.kafka.core;
-
-import java.util.function.Consumer;
+package org.springframework.kafka.core.parallelconsumer;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
-import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
 import io.confluent.parallelconsumer.PollContext;
 
 /**
  * ...
+ *
+ * @author Sanghyeok An
+ * @since 3.3
  */
 
-public interface PollAndProduceResultCallback<K, V> extends PollAndProduceCallback<K, V>, ResultConsumerCallback<K, V> {
+public interface PollAndProduce<K, V> extends ParallelConsumerCallback<K, V> {
+
+	/**
+	 * ...
+	 */
+	ProducerRecord<K, V> accept(PollContext<K, V> context);
+
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java
similarity index 73%
rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java
rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java
index 885991cf3c..2d46391c39 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java
@@ -14,19 +14,26 @@
  * limitations under the License.
  */
 
-package org.springframework.kafka.core;
+package org.springframework.kafka.core.parallelconsumer;
 
 import java.util.List;
-import java.util.function.Consumer;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
-import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
 import io.confluent.parallelconsumer.PollContext;
 
 /**
  * ...
+ *
+ * @author Sanghyeok An
+ * @since 3.3
  */
 
-public interface PollAndProduceManyResultCallback<K, V> extends PollAndProduceManyCallback<K, V>, ResultConsumerCallback<K, V> {
+public interface PollAndProduceMany<K, V> extends ParallelConsumerCallback<K, V> {
+
+	/**
+	 * ...
+	 */
+	List<ProducerRecord<K, V>> accept(PollContext<K, V> context);
+
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java
similarity index 51%
rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java
rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java
index 3d6d27f253..b9be8d9db0 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java
@@ -14,27 +14,18 @@
  * limitations under the License.
  */
 
-package org.springframework.kafka.core;
+package org.springframework.kafka.core.parallelconsumer;
 
-import java.util.List;
-import java.util.function.Consumer;
-import java.util.regex.Pattern;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.kafka.config.ParallelConsumerContext;
-
-import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
-import io.confluent.parallelconsumer.PollContext;
+import org.springframework.kafka.core.ResultConsumerCallback;
 
 /**
  * ...
+ *
+ * @author Sanghyeok An
+ * @since 3.3
  */
 
-public interface PollAndProduceCallback<K, V> extends ParallelConsumerCallback<K, V> {
+public interface PollAndProduceManyResult<K, V> extends PollAndProduceMany<K, V>,
+														ResultConsumerCallback<K, V> {
 
-	/**
-	 * ...
-	 */
-	ProducerRecord<K, V> accept(PollContext<K, V> context);
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java
similarity index 50%
rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java
rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java
index 8868f9ffe5..fca8fe9d5a 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java
@@ -14,27 +14,18 @@
  * limitations under the License.
  */
 
-package org.springframework.kafka.core;
+package org.springframework.kafka.core.parallelconsumer;
 
-import java.util.List;
-import java.util.function.Consumer;
-import java.util.regex.Pattern;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.kafka.config.ParallelConsumerContext;
-
-import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
-import io.confluent.parallelconsumer.PollContext;
+import org.springframework.kafka.core.ResultConsumerCallback;
 
 /**
  * ...
+ *
+ * @author Sanghyeok An
+ * @since 3.3
  */
 
-public interface PollAndProduceManyCallback<K, V> extends ParallelConsumerCallback<K, V> {
+public interface PollAndProduceResult<K, V> extends PollAndProduce<K, V>,
+													ResultConsumerCallback<K, V> {
 
-	/**
-	 * ...
-	 */
-	List<ProducerRecord<K, V>> accept(PollContext<K, V> context);
 }

From 989cef16a222dd02d4b598aa483ce7f797a26347 Mon Sep 17 00:00:00 2001
From: chickenchickenlove <ojt90902@naver.com>
Date: Sun, 28 Apr 2024 08:38:41 +0900
Subject: [PATCH 4/5] Add doc

---
 ...allelConsumerOptionsProviderCondition.java | 27 ++++++++++++++
 .../kafka/config/ParallelConsumerConfig.java  |  4 ++-
 .../config/ParallelConsumerConfiguration.java | 12 ++++---
 .../kafka/config/ParallelConsumerContext.java | 17 ++++-----
 .../ParallelConsumerImportSelector.java       |  9 +++--
 .../kafka/core/ParallelConsumerFactory.java   | 15 ++++----
 .../kafka/core/ResultConsumerCallback.java    |  9 -----
 .../ParallelConsumerOptionsProvider.java      | 17 +++++----
 .../ParallelConsumerResultInterface.java      | 35 +++++++++++++++++++
 ...ava => ParallelConsumerRootInterface.java} | 14 ++++----
 .../kafka/core/parallelconsumer/Poll.java     | 13 ++++---
 .../core/parallelconsumer/PollAndProduce.java | 12 +++++--
 .../parallelconsumer/PollAndProduceMany.java  | 12 +++++--
 .../PollAndProduceManyResult.java             | 18 ++++++++--
 .../PollAndProduceResult.java                 | 18 ++++++++--
 15 files changed, 169 insertions(+), 63 deletions(-)
 delete mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java
 create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java
 rename spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/{ParallelConsumerCallback.java => ParallelConsumerRootInterface.java} (74%)

diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java
index a383383755..615ac24ffc 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java
@@ -1,10 +1,37 @@
+/*
+ * Copyright 2016-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.springframework.kafka.config;
 
+import org.springframework.context.ApplicationContext;
 import org.springframework.context.annotation.Condition;
 import org.springframework.context.annotation.ConditionContext;
 import org.springframework.core.type.AnnotatedTypeMetadata;
 import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider;
 
+/**
+ * This class is to provide {@link ParallelConsumerOptionsProvider} as default.
+ * {@link ParallelConsumerConfiguration} use this function when {@link ApplicationContext} started.
+ * If there is no spring bean in {@link ApplicationContext} such as {@link ParallelConsumerOptionsProvider},
+ * {@link ParallelConsumerConfiguration} will register default {@link ParallelConsumerOptionsProvider}.
+ *
+ * @author Sanghyeok An
+ *
+ * @since 3.3
+ */
 public class OnMissingParallelConsumerOptionsProviderCondition implements Condition {
 
 	@Override
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
index 8cb5557f35..a7eb9734b9 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java
@@ -30,7 +30,9 @@
 /**
  * ParallelConsumerConfig is for config of {@link io.confluent.parallelconsumer}.
  * This will be registered as Spring Bean when {@link EnableParallelConsumer} is annotated to your spring application.
- * @author ...
+ *
+ * @author Sanghyeok An
+ *
  * @since 3.3
  */
 
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
index da5e556826..ea6557fde0 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java
@@ -20,7 +20,7 @@
 import org.springframework.context.annotation.Conditional;
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback;
+import org.springframework.kafka.core.parallelconsumer.ParallelConsumerRootInterface;
 import org.springframework.kafka.core.ParallelConsumerFactory;
 import org.springframework.kafka.annotation.EnableParallelConsumer;
 import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider;
@@ -30,9 +30,11 @@
  * If so, this class will register both {@link ParallelConsumerContext} and {@link ParallelConsumerFactory} as Spring Bean.
  * User has responsibility
  *   1. annotated {@link EnableParallelConsumer} on their spring application
- *   2. register ConcreteClass of {@link ParallelConsumerCallback}.
- * @author ...
- * @since 3.2.0
+ *   2. register ConcreteClass of {@link ParallelConsumerRootInterface}.
+ *
+ * @author Sanghyoek An
+ *
+ * @since 3.3
  */
 
 public class ParallelConsumerConfiguration<K, V> {
@@ -50,7 +52,7 @@ public ParallelConsumerConfig<K, V> parallelConsumerConfig(ParallelConsumerOptio
 
 	@Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME)
 	public ParallelConsumerContext<K,V> parallelConsumerContext(ParallelConsumerConfig<K, V> parallelConsumerConfig,
-																ParallelConsumerCallback<K, V> parallelConsumerCallback) {
+																ParallelConsumerRootInterface<K, V> parallelConsumerCallback) {
 		return new ParallelConsumerContext<K, V>(parallelConsumerConfig,
 												 parallelConsumerCallback);
 	}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
index 4bb2579a13..205be1e5f2 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java
@@ -18,16 +18,17 @@
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
-import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback;
+import org.springframework.kafka.core.parallelconsumer.ParallelConsumerRootInterface;
 
 import io.confluent.parallelconsumer.ParallelConsumerOptions;
 import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder;
-import io.confluent.parallelconsumer.ParallelStreamProcessor;
 
 /**
- * This class is for aggregating all related with ParallelConsumer.
- * @author ...
- * @since 3.2.0
+ * This class is for collecting all related with ParallelConsumer.
+ *
+ * @author Sanghyeok An
+ *
+ * @since 3.3
  */
 
 
@@ -35,15 +36,15 @@ public class ParallelConsumerContext<K,V> {
 
 	public static final String DEFAULT_BEAN_NAME = "parallelConsumerContext";
 	private final ParallelConsumerConfig parallelConsumerConfig;
-	private final ParallelConsumerCallback<K, V> parallelConsumerCallback;
+	private final ParallelConsumerRootInterface<K, V> parallelConsumerCallback;
 
 	public ParallelConsumerContext(ParallelConsumerConfig<K, V> parallelConsumerConfig,
-								   ParallelConsumerCallback<K, V> callback) {
+								   ParallelConsumerRootInterface<K, V> callback) {
 		this.parallelConsumerConfig = parallelConsumerConfig;
 		this.parallelConsumerCallback = callback;
 	}
 
-	public ParallelConsumerCallback<K, V> parallelConsumerCallback() {
+	public ParallelConsumerRootInterface<K, V> parallelConsumerCallback() {
 		return this.parallelConsumerCallback;
 	}
 
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java
index 9cc32b013c..938addcae1 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java
@@ -19,12 +19,15 @@
 import org.springframework.context.annotation.ImportSelector;
 import org.springframework.core.type.AnnotationMetadata;
 import org.springframework.kafka.annotation.EnableParallelConsumer;
+
 /**
  * ParallelConsumerImportSelector is to register {@link ParallelConsumerConfiguration}.
  * If you want to import {@link ParallelConsumerConfiguration} to your application,
  * you just annotated {@link EnableParallelConsumer} to your spring application.
- * @author ...
- * @since 3.2.0
+ *
+ * @author Sanghyeok An
+ *
+ * @since 3.3
  */
 
 public class ParallelConsumerImportSelector implements ImportSelector {
@@ -32,4 +35,4 @@ public class ParallelConsumerImportSelector implements ImportSelector {
 	public String[] selectImports(AnnotationMetadata importingClassMetadata) {
 		return new String[]{ParallelConsumerConfiguration.class.getName()};
 	}
-}
\ No newline at end of file
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
index 43b77c1df1..5a22685f15 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java
@@ -26,12 +26,13 @@
 import org.apache.kafka.clients.producer.Producer;
 import org.springframework.context.SmartLifecycle;
 import org.springframework.kafka.config.ParallelConsumerContext;
-import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback;
+import org.springframework.kafka.core.parallelconsumer.ParallelConsumerRootInterface;
 import org.springframework.kafka.core.parallelconsumer.PollAndProduce;
 import org.springframework.kafka.core.parallelconsumer.PollAndProduceMany;
 import org.springframework.kafka.core.parallelconsumer.PollAndProduceManyResult;
 import org.springframework.kafka.core.parallelconsumer.PollAndProduceResult;
 import org.springframework.kafka.core.parallelconsumer.Poll;
+import org.springframework.kafka.core.parallelconsumer.ParallelConsumerResultInterface;
 
 import io.confluent.parallelconsumer.ParallelConsumerOptions;
 import io.confluent.parallelconsumer.ParallelStreamProcessor;
@@ -40,7 +41,9 @@
 /**
  * ParallelConsumerFactory will be started and closed by Spring LifeCycle.
  * This class is quite simple, because ParallelConsumer requires delegating the situation to itself.
+ *
  * @author Sanghyeok An
+ *
  * @since 3.3
  */
 
@@ -71,7 +74,7 @@ public ParallelConsumerFactory(ParallelConsumerContext<K, V> parallelConsumerCon
 
 	private ParallelConsumerOptions<K, V> parallelConsumerOptions(Consumer<K, V> consumer,
 																  Producer<K, V> producer) {
-		final ParallelConsumerCallback<K, V> callback = parallelConsumerContext.parallelConsumerCallback();
+		final ParallelConsumerRootInterface<K, V> callback = parallelConsumerContext.parallelConsumerCallback();
 		if (callback instanceof PollAndProduceMany<K,V> ||
 			callback instanceof PollAndProduce<K,V>) {
 			return parallelConsumerContext.getParallelConsumerOptions(consumer, producer);
@@ -84,9 +87,9 @@ private ParallelConsumerOptions<K, V> parallelConsumerOptions(Consumer<K, V> con
 	public void start() {
 		subscribe();
 
-		final ParallelConsumerCallback<K, V> callback0 = parallelConsumerContext.parallelConsumerCallback();
+		final ParallelConsumerRootInterface<K, V> callback0 = parallelConsumerContext.parallelConsumerCallback();
 
-		if (callback0 instanceof ResultConsumerCallback) {
+		if (callback0 instanceof ParallelConsumerResultInterface) {
 			if (callback0 instanceof PollAndProduceManyResult<K, V>) {
 				final PollAndProduceManyResult<K, V> callback =
 						(PollAndProduceManyResult<K, V>) callback0;
@@ -129,7 +132,7 @@ else if (callback0 instanceof Poll<K, V>) {
 
 	@Override
 	public void stop() {
-		final ParallelConsumerCallback<K, V> callback =
+		final ParallelConsumerRootInterface<K, V> callback =
 				this.parallelConsumerContext.parallelConsumerCallback();
 		final DrainingMode drainingMode = callback.drainingMode();
 		final Duration duration = callback.drainTimeOut();
@@ -144,7 +147,7 @@ public boolean isRunning() {
 	}
 
 	private void subscribe() {
-		final ParallelConsumerCallback<K, V> callback = this.parallelConsumerContext.parallelConsumerCallback();
+		final ParallelConsumerRootInterface<K, V> callback = this.parallelConsumerContext.parallelConsumerCallback();
 
 		final List<String> topics = callback.getTopics();
 		final ConsumerRebalanceListener rebalanceListener = callback.getRebalanceListener();
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java
deleted file mode 100644
index 07a32cf437..0000000000
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.springframework.kafka.core;
-
-import java.util.function.Consumer;
-
-import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
-
-public interface ResultConsumerCallback<K, V> {
-	Consumer<ConsumeProduceResult<K, V, K, V>> resultConsumer(ConsumeProduceResult<K, V, K, V> result);
-}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java
index 09bfb13029..5d56e0c6a5 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java
@@ -21,30 +21,29 @@
 
 import javax.annotation.Nullable;
 
-import org.springframework.kafka.config.ParallelConsumerContext;
-import org.springframework.kafka.core.ParallelConsumerFactory;
-
-import io.confluent.parallelconsumer.ParallelConsumerOptions;
+import io.confluent.parallelconsumer.ParallelConsumer;
 import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
 import io.confluent.parallelconsumer.ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy;
-import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder;
 import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
 import io.confluent.parallelconsumer.RecordContext;
 import io.micrometer.core.instrument.MeterRegistry;
 import io.micrometer.core.instrument.Tag;
 
 /**
+ * User can configure options of {@link ParallelConsumer} via {@link ParallelConsumerOptionsProvider}.
+ * If user want to configure options of {@link ParallelConsumer}, user should implement {@link ParallelConsumerOptionsProvider}
+ * and register it as spring bean.
+ *
+ * User don't need to implement all of methods.
+ * Note : If a method returns null, that option will use the default value of the {@link ParallelConsumer}.
  *
  * @author Sanghyeok An
+ *
  * @since 3.3
  */
 
 public interface ParallelConsumerOptionsProvider<K, V> {
 
-	default void hello() {
-		ParallelConsumerOptionsBuilder<Object, Object> builder = ParallelConsumerOptions.builder();
-	}
-
 	@Nullable
 	default String managedExecutorService() {
 		return null;
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java
new file mode 100644
index 0000000000..481065d910
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2014-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.core.parallelconsumer;
+
+import java.util.function.Consumer;
+
+import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
+
+/**
+ * This interface is an interface that marks whether there is a Callback for {@link ConsumeProduceResult}.
+ * Users should implement one of {@link Poll}, {@link PollAndProduce}, {@link PollAndProduceResult}
+ * , {@link PollAndProduceMany}, {@link PollAndProduceManyResult} instead of {@link ParallelConsumerResultInterface}.
+ *
+ * @author Sanghyeok An
+ *
+ * @since 3.3
+ */
+
+public interface ParallelConsumerResultInterface<K, V> {
+	Consumer<ConsumeProduceResult<K, V, K, V>> resultConsumer(ConsumeProduceResult<K, V, K, V> result);
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerRootInterface.java
similarity index 74%
rename from spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java
rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerRootInterface.java
index 29a995580f..9b6380c240 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerRootInterface.java
@@ -21,21 +21,21 @@
 import java.util.regex.Pattern;
 
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.springframework.kafka.config.ParallelConsumerContext;
-import org.springframework.kafka.core.ParallelConsumerFactory;
 
 import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode;
 
 /**
- * User should create ConcreteClass of this and register it as Spring Bean.
- * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext},
- * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start.
- *
+ * This interface provides a common interface for sub-interfaces.
+ * Users should not implement this interface.
+ * Users should implement one of {@link Poll}, {@link PollAndProduce}, {@link PollAndProduceResult}
+ * , {@link PollAndProduceMany}, {@link PollAndProduceManyResult} instead of {@link ParallelConsumerRootInterface}.
+
  * @author Sanghyeok An
+ *
  * @since 3.3
  */
 
-public interface ParallelConsumerCallback<K, V> {
+public interface ParallelConsumerRootInterface<K, V> {
 
 	/**
 	 * ...
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java
index bfa3f7dbd3..f3a399d979 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java
@@ -16,21 +16,26 @@
 
 package org.springframework.kafka.core.parallelconsumer;
 
+import java.util.function.Consumer;
+
 import org.springframework.kafka.config.ParallelConsumerContext;
 import org.springframework.kafka.core.ParallelConsumerFactory;
 
+import io.confluent.parallelconsumer.ParallelStreamProcessor;
 import io.confluent.parallelconsumer.PollContext;
 
 /**
- * User should create ConcreteClass of this and register it as Spring Bean.
- * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext},
- * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start.
+ * This interface is intended for use when the user does not use the producer after consuming.
+ * User should implement {@link Poll} and register it as Spring Bean.
+ * {@link Poll#accept(PollContext)} will be called by {@link ParallelStreamProcessor#poll(Consumer)}
+ * when {@link ParallelConsumerFactory} started.
  *
  * @author Sanghyeok An
+ *
  * @since 3.3
  */
 
-public interface Poll<K, V> extends ParallelConsumerCallback<K, V> {
+public interface Poll<K, V> extends ParallelConsumerRootInterface<K, V> {
 
 	/**
 	 * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}.
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java
index 1b00ed251d..1eb171d24e 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java
@@ -16,18 +16,26 @@
 
 package org.springframework.kafka.core.parallelconsumer;
 
+import java.util.function.Function;
+
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.kafka.core.ParallelConsumerFactory;
 
+import io.confluent.parallelconsumer.ParallelStreamProcessor;
 import io.confluent.parallelconsumer.PollContext;
 
 /**
- * ...
+ * This interface is intended for use when the user use the producer after consuming.
+ * User should implement {@link PollAndProduce} and register it as Spring Bean.
+ * {@link PollAndProduce#accept(PollContext)} will be called by {@link ParallelStreamProcessor#pollAndProduce(Function)}
+ * when {@link ParallelConsumerFactory} started.
  *
  * @author Sanghyeok An
+ *
  * @since 3.3
  */
 
-public interface PollAndProduce<K, V> extends ParallelConsumerCallback<K, V> {
+public interface PollAndProduce<K, V> extends ParallelConsumerRootInterface<K, V> {
 
 	/**
 	 * ...
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java
index 2d46391c39..5d7f20c04f 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java
@@ -17,19 +17,25 @@
 package org.springframework.kafka.core.parallelconsumer;
 
 import java.util.List;
+import java.util.function.Function;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.kafka.core.ParallelConsumerFactory;
 
+import io.confluent.parallelconsumer.ParallelStreamProcessor;
 import io.confluent.parallelconsumer.PollContext;
 
 /**
- * ...
- *
+ * This interface is intended for use when the user use the producer after consuming.
+ * User should implement {@link PollAndProduceMany} and register it as Spring Bean.
+ * {@link PollAndProduceMany#accept(PollContext)} will be called by {@link ParallelStreamProcessor#pollAndProduceMany(Function)}
+ * when {@link ParallelConsumerFactory} started.
  * @author Sanghyeok An
+ *
  * @since 3.3
  */
 
-public interface PollAndProduceMany<K, V> extends ParallelConsumerCallback<K, V> {
+public interface PollAndProduceMany<K, V> extends ParallelConsumerRootInterface<K, V> {
 
 	/**
 	 * ...
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java
index b9be8d9db0..810c8af97b 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java
@@ -16,16 +16,28 @@
 
 package org.springframework.kafka.core.parallelconsumer;
 
-import org.springframework.kafka.core.ResultConsumerCallback;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.springframework.kafka.core.ParallelConsumerFactory;
+
+import io.confluent.parallelconsumer.ParallelStreamProcessor;
+import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
+import io.confluent.parallelconsumer.PollContext;
 
 /**
- * ...
+ * This interface is intended for use when the user use the producer after consuming.
+ * User should implement {@link PollAndProduce} and register it as Spring Bean.
+ * {@link PollAndProduceManyResult#accept(PollContext)} and {@link PollAndProduceManyResult#resultConsumer(ConsumeProduceResult)}
+ * will be called by {@link ParallelStreamProcessor#pollAndProduceMany(Function, Consumer)}
+ * when {@link ParallelConsumerFactory} started.
  *
  * @author Sanghyeok An
+ *
  * @since 3.3
  */
 
 public interface PollAndProduceManyResult<K, V> extends PollAndProduceMany<K, V>,
-														ResultConsumerCallback<K, V> {
+														ParallelConsumerResultInterface<K, V> {
 
 }
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java
index fca8fe9d5a..fbf9c4c48c 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java
@@ -16,16 +16,28 @@
 
 package org.springframework.kafka.core.parallelconsumer;
 
-import org.springframework.kafka.core.ResultConsumerCallback;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.springframework.kafka.core.ParallelConsumerFactory;
+
+import io.confluent.parallelconsumer.ParallelStreamProcessor;
+import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult;
+import io.confluent.parallelconsumer.PollContext;
 
 /**
- * ...
+ * This interface is intended for use when the user use the producer after consuming.
+ * User should implement {@link PollAndProduceResult} and register it as Spring Bean.
+ * Both {@link PollAndProduceResult#accept(PollContext)} {@link PollAndProduceResult#resultConsumer(ConsumeProduceResult)}
+ * will be called by {@link ParallelStreamProcessor#pollAndProduce(Function, Consumer)}
+ * when {@link ParallelConsumerFactory} started.
  *
  * @author Sanghyeok An
+ *
  * @since 3.3
  */
 
 public interface PollAndProduceResult<K, V> extends PollAndProduce<K, V>,
-													ResultConsumerCallback<K, V> {
+													ParallelConsumerResultInterface<K, V> {
 
 }

From 1790392bbff4df1134a07ef5ba8efcf466872d92 Mon Sep 17 00:00:00 2001
From: chickenchickenlove <ojt90902@naver.com>
Date: Sun, 28 Apr 2024 08:39:18 +0900
Subject: [PATCH 5/5] fix miss typo

---
 .../kafka/annotation/EnableParallelConsumer.java          | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java
index 6966edbde3..0cf4e7a5e5 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java
@@ -28,12 +28,14 @@
 /**
  * If you want to import {@link ParallelConsumerConfiguration} to your application,
  * you just annotated {@link EnableParallelConsumer} to your spring application.
- * @author ...
- * @since 3.2.0
+ *
+ * @author Sanghyeok An
+ *
+ * @since 3.3
  */
 
 @Target(ElementType.TYPE)
 @Retention(RetentionPolicy.RUNTIME)
 @Import(ParallelConsumerImportSelector.class)
 public @interface EnableParallelConsumer {
-}
\ No newline at end of file
+}