diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
index 2e3b84cbf31a..4c1036b03595 100644
--- a/itests/qtest-druid/pom.xml
+++ b/itests/qtest-druid/pom.xml
@@ -36,7 +36,7 @@
10.11.1.1
16.0.1
4.1.0
- 2.5.0
+ 3.9.1
4.1.0
1.7.30
@@ -226,6 +226,11 @@
kafka-clients
${kafka.test.version}
+
+ org.apache.kafka
+ kafka-server
+ ${kafka.test.version}
+
org.slf4j
slf4j-api
diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
index 746830a9a6b8..528b8a6649c2 100644
--- a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
+++ b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
@@ -19,7 +19,7 @@
package org.apache.hive.kafka;
import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
+import kafka.server.KafkaServer;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.service.AbstractService;
@@ -29,6 +29,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
import com.google.common.base.Throwables;
import com.google.common.io.Files;
@@ -43,6 +44,7 @@
import java.util.List;
import java.util.Properties;
import java.util.stream.IntStream;
+import scala.Option;
/**
* This class has the hooks to start and stop single node kafka cluster.
@@ -54,7 +56,7 @@ public class SingleNodeKafkaCluster extends AbstractService {
private static final String LOCALHOST = "localhost";
- private final KafkaServerStartable serverStartable;
+ private final KafkaServer server;
private final int brokerPort;
private final String kafkaServer;
@@ -94,13 +96,13 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort, Intege
properties.setProperty("transaction.state.log.min.isr", String.valueOf(1));
properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577");
- this.serverStartable = new KafkaServerStartable(KafkaConfig.fromProps(properties));
+ this.server = new KafkaServer(KafkaConfig.fromProps(properties), Time.SYSTEM, Option.empty(), false);
}
@Override
protected void serviceStart() throws Exception {
- serverStartable.startup();
+ server.startup();
log.info("Kafka Server Started on port {}", brokerPort);
}
@@ -108,7 +110,7 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
log.info("Stopping Kafka Server");
- serverStartable.shutdown();
+ server.shutdown();
log.info("Kafka Server Stopped");
}
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
index 7a7d0360a015..72eeabbf1f8f 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -33,6 +33,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.Uuid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Future;
/**
@@ -67,6 +69,11 @@ class HiveKafkaProducer implements Producer {
kafkaProducer = new KafkaProducer<>(properties);
}
+ @Override
+ public Uuid clientInstanceId(Duration timeout) {
+ throw new UnsupportedOperationException();
+ }
+
@Override public void initTransactions() {
kafkaProducer.initTransactions();
}
@@ -138,11 +145,11 @@ synchronized void resumeTransaction(long producerId, short epoch) {
Object transactionManager = getValue(kafkaProducer, "transactionManager");
- Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper");
+ Object txnPartitionMap = getValue(transactionManager, "txnPartitionMap");
invoke(transactionManager,
"transitionTo",
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
- invoke(topicPartitionBookkeeper, "reset");
+ invoke(txnPartitionMap, "reset");
Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
setValue(producerIdAndEpoch, "producerId", producerId);
setValue(producerIdAndEpoch, "epoch", epoch);
@@ -181,10 +188,15 @@ short getEpoch() {
*/
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
- TransactionalRequestResult result = enqueueNewPartitions();
- Object sender = getValue(kafkaProducer, "sender");
- invoke(sender, "wakeup");
- result.await();
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ Set newPartitionsInTransaction =
+ (Set) getValue(transactionManager, "newPartitionsInTransaction");
+ if (!newPartitionsInTransaction.isEmpty()) {
+ TransactionalRequestResult result = enqueueNewPartitions();
+ Object sender = getValue(kafkaProducer, "sender");
+ invoke(sender, "wakeup");
+ result.await();
+ }
}
private synchronized TransactionalRequestResult enqueueNewPartitions() {
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
index 74614dea9168..66784d28cbb1 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
@@ -140,7 +140,7 @@ class KafkaRecordIterator implements Iterator> {
}
} else {
// case seek to beginning of stream
- consumer.seekToBeginning(Collections.singleton(topicPartition));
+ consumer.seekToBeginning(topicPartitionList);
// seekToBeginning is lazy thus need to call position() or poll(0)
this.startOffset = consumer.position(topicPartition);
LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]",
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
index 8c9ed5f99b19..934e8eb30fba 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
@@ -25,6 +25,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -158,7 +159,9 @@
@Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpochAndId() {
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(3434L, (short) 12);
- secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+ secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
+ new TopicPartition("dummy_topic", 0),
+ new OffsetAndMetadata(0L)), "__dummy_consumer_group");
secondProducer.close(Duration.ZERO);
}
@@ -169,7 +172,9 @@
producer.close(Duration.ZERO);
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(pid, (short) 12);
- secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+ secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
+ new TopicPartition("dummy_topic", 0),
+ new OffsetAndMetadata(0L)), "__dummy_consumer_group");
secondProducer.close(Duration.ZERO);
}
@@ -180,7 +185,9 @@
producer.close(Duration.ZERO);
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(45L, epoch);
- secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+ secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
+ new TopicPartition("dummy_topic", 0),
+ new OffsetAndMetadata(0L)), "__dummy_consumer_group");
secondProducer.close(Duration.ZERO);
}
}
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index e2f8bbafe016..84a79edeca07 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -26,7 +26,7 @@
import kafka.zk.EmbeddedZookeeper;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hive.common.IPStackUtils;
-import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestSslUtils;
import org.junit.rules.ExternalResource;
@@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
+import scala.Option;
/**
* Test Helper Class to start and stop a kafka broker.
@@ -106,7 +107,8 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
brokerProps.setProperty("listener.name.l2.gssapi.sasl.jaas.config", jaasConfig);
brokerProps.setProperty("listener.name.l3.gssapi.sasl.jaas.config", jaasConfig);
truststoreFile = File.createTempFile("kafka_truststore", "jks");
- brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(Mode.SERVER).createNewTrustStore(truststoreFile).build());
+ brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(ConnectionMode.SERVER)
+ .createNewTrustStore(truststoreFile).build());
brokerProps.setProperty("delegation.token.master.key", "AnyValueShouldDoHereItDoesntMatter");
}
brokerProps.setProperty("offsets.topic.replication.factor", "1");
@@ -116,9 +118,9 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
kafkaServer.startup();
kafkaServer.zkClient();
- adminZkClient = new AdminZkClient(kafkaServer.zkClient());
+ adminZkClient = new AdminZkClient(kafkaServer.zkClient(), Option.empty());
LOG.info("Creating kafka TOPIC [{}]", TOPIC);
- adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$, false);
}
/**
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
index b2dbf12817e5..3df2c8c4231a 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
@@ -44,6 +44,7 @@
import javax.annotation.Nullable;
import java.nio.charset.Charset;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -304,7 +305,7 @@ private static void sendData(List> recordList, @N
@After public void tearDown() {
this.kafkaRecordIterator = null;
if (this.consumer != null) {
- this.consumer.close();
+ this.consumer.close(Duration.ZERO);
}
}
diff --git a/pom.xml b/pom.xml
index 0b6e318a1dd3..e4a248b83483 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,7 +172,7 @@
4.13.2
5.13.3
5.13.3
- 2.5.0
+ 3.9.1
5.5.0
1.11.9
1.17.0