Skip to content

Commit d62c1fc

Browse files
committed
HIVE-29238:upgrade kafka version to fix CVE-2024-31141 and CVE-2021-38153
1 parent e44cf34 commit d62c1fc

File tree

8 files changed

+51
-22
lines changed

8 files changed

+51
-22
lines changed

itests/qtest-druid/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<druid.derby.version>10.11.1.1</druid.derby.version>
3737
<druid.guava.version>16.0.1</druid.guava.version>
3838
<druid.guice.version>4.1.0</druid.guice.version>
39-
<kafka.test.version>2.5.0</kafka.test.version>
39+
<kafka.test.version>3.9.1</kafka.test.version>
4040
<druid.guice.version>4.1.0</druid.guice.version>
4141
<slf4j.version>1.7.30</slf4j.version>
4242
</properties>
@@ -226,6 +226,11 @@
226226
<artifactId>kafka-clients</artifactId>
227227
<version>${kafka.test.version}</version>
228228
</dependency>
229+
<dependency>
230+
<groupId>org.apache.kafka</groupId>
231+
<artifactId>kafka-server</artifactId>
232+
<version>${kafka.test.version}</version>
233+
</dependency>
229234
<dependency>
230235
<groupId>org.slf4j</groupId>
231236
<artifactId>slf4j-api</artifactId>

itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.hive.kafka;
2020

2121
import kafka.server.KafkaConfig;
22-
import kafka.server.KafkaServerStartable;
22+
import kafka.server.KafkaServer;
2323

2424
import org.apache.commons.io.FileUtils;
2525
import org.apache.hadoop.service.AbstractService;
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.clients.producer.ProducerRecord;
3030
import org.apache.kafka.common.serialization.ByteArraySerializer;
3131
import org.apache.kafka.common.serialization.StringSerializer;
32+
import org.apache.kafka.common.utils.Time;
3233

3334
import com.google.common.base.Throwables;
3435
import com.google.common.io.Files;
@@ -43,6 +44,7 @@
4344
import java.util.List;
4445
import java.util.Properties;
4546
import java.util.stream.IntStream;
47+
import scala.Option;
4648

4749
/**
4850
* This class has the hooks to start and stop single node kafka cluster.
@@ -54,7 +56,7 @@ public class SingleNodeKafkaCluster extends AbstractService {
5456
private static final String LOCALHOST = "localhost";
5557

5658

57-
private final KafkaServerStartable serverStartable;
59+
private final KafkaServer server;
5860
private final int brokerPort;
5961
private final String kafkaServer;
6062

@@ -94,21 +96,21 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort, Intege
9496
properties.setProperty("transaction.state.log.min.isr", String.valueOf(1));
9597
properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577");
9698

97-
this.serverStartable = new KafkaServerStartable(KafkaConfig.fromProps(properties));
99+
this.server = new KafkaServer(KafkaConfig.fromProps(properties), Time.SYSTEM, Option.empty(), false);
98100
}
99101

100102

101103
@Override
102104
protected void serviceStart() throws Exception {
103-
serverStartable.startup();
105+
server.startup();
104106
log.info("Kafka Server Started on port {}", brokerPort);
105107

106108
}
107109

108110
@Override
109111
protected void serviceStop() throws Exception {
110112
log.info("Stopping Kafka Server");
111-
serverStartable.shutdown();
113+
server.shutdown();
112114
log.info("Kafka Server Stopped");
113115
}
114116

kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.kafka.common.PartitionInfo;
3434
import org.apache.kafka.common.TopicPartition;
3535
import org.apache.kafka.common.errors.ProducerFencedException;
36+
import org.apache.kafka.common.Uuid;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
3839

@@ -44,6 +45,7 @@
4445
import java.util.List;
4546
import java.util.Map;
4647
import java.util.Properties;
48+
import java.util.Set;
4749
import java.util.concurrent.Future;
4850

4951
/**
@@ -67,6 +69,11 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> {
6769
kafkaProducer = new KafkaProducer<>(properties);
6870
}
6971

72+
@Override
73+
public Uuid clientInstanceId(Duration timeout) {
74+
throw new UnsupportedOperationException();
75+
}
76+
7077
@Override public void initTransactions() {
7178
kafkaProducer.initTransactions();
7279
}
@@ -138,11 +145,11 @@ synchronized void resumeTransaction(long producerId, short epoch) {
138145

139146
Object transactionManager = getValue(kafkaProducer, "transactionManager");
140147

141-
Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper");
148+
Object txnPartitionMap = getValue(transactionManager, "txnPartitionMap");
142149
invoke(transactionManager,
143150
"transitionTo",
144151
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
145-
invoke(topicPartitionBookkeeper, "reset");
152+
invoke(txnPartitionMap, "reset");
146153
Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
147154
setValue(producerIdAndEpoch, "producerId", producerId);
148155
setValue(producerIdAndEpoch, "epoch", epoch);
@@ -181,10 +188,15 @@ short getEpoch() {
181188
*/
182189
private void flushNewPartitions() {
183190
LOG.info("Flushing new partitions");
184-
TransactionalRequestResult result = enqueueNewPartitions();
185-
Object sender = getValue(kafkaProducer, "sender");
186-
invoke(sender, "wakeup");
187-
result.await();
191+
Object transactionManager = getValue(kafkaProducer, "transactionManager");
192+
Set<TopicPartition> newPartitionsInTransaction =
193+
(Set<TopicPartition>) getValue(transactionManager, "newPartitionsInTransaction");
194+
if (!newPartitionsInTransaction.isEmpty()) {
195+
TransactionalRequestResult result = enqueueNewPartitions();
196+
Object sender = getValue(kafkaProducer, "sender");
197+
invoke(sender, "wakeup");
198+
result.await();
199+
}
188200
}
189201

190202
private synchronized TransactionalRequestResult enqueueNewPartitions() {

kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
140140
}
141141
} else {
142142
// case seek to beginning of stream
143-
consumer.seekToBeginning(Collections.singleton(topicPartition));
143+
consumer.seekToBeginning(topicPartitionList);
144144
// seekToBeginning is lazy thus need to call position() or poll(0)
145145
this.startOffset = consumer.position(topicPartition);
146146
LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]",

kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.clients.consumer.KafkaConsumer;
2626
import org.apache.kafka.clients.producer.ProducerConfig;
2727
import org.apache.kafka.clients.producer.ProducerRecord;
28+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2829
import org.apache.kafka.common.TopicPartition;
2930
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
3031
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -158,7 +159,9 @@
158159
@Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpochAndId() {
159160
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
160161
secondProducer.resumeTransaction(3434L, (short) 12);
161-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
162+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
163+
new TopicPartition("dummy_topic", 0),
164+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
162165
secondProducer.close(Duration.ZERO);
163166
}
164167

@@ -169,7 +172,9 @@
169172
producer.close(Duration.ZERO);
170173
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
171174
secondProducer.resumeTransaction(pid, (short) 12);
172-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
175+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
176+
new TopicPartition("dummy_topic", 0),
177+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
173178
secondProducer.close(Duration.ZERO);
174179
}
175180

@@ -180,7 +185,9 @@
180185
producer.close(Duration.ZERO);
181186
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
182187
secondProducer.resumeTransaction(45L, epoch);
183-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
188+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
189+
new TopicPartition("dummy_topic", 0),
190+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
184191
secondProducer.close(Duration.ZERO);
185192
}
186193
}

kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import kafka.zk.EmbeddedZookeeper;
2727
import org.apache.commons.io.FileUtils;
2828
import org.apache.hadoop.hive.common.IPStackUtils;
29-
import org.apache.kafka.common.network.Mode;
29+
import org.apache.kafka.common.network.ConnectionMode;
3030
import org.apache.kafka.common.utils.Time;
3131
import org.apache.kafka.test.TestSslUtils;
3232
import org.junit.rules.ExternalResource;
@@ -41,6 +41,7 @@
4141
import java.util.Map;
4242
import java.util.Properties;
4343
import java.util.stream.Collectors;
44+
import scala.Option;
4445

4546
/**
4647
* Test Helper Class to start and stop a kafka broker.
@@ -106,7 +107,8 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
106107
brokerProps.setProperty("listener.name.l2.gssapi.sasl.jaas.config", jaasConfig);
107108
brokerProps.setProperty("listener.name.l3.gssapi.sasl.jaas.config", jaasConfig);
108109
truststoreFile = File.createTempFile("kafka_truststore", "jks");
109-
brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(Mode.SERVER).createNewTrustStore(truststoreFile).build());
110+
brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(ConnectionMode.SERVER)
111+
.createNewTrustStore(truststoreFile).build());
110112
brokerProps.setProperty("delegation.token.master.key", "AnyValueShouldDoHereItDoesntMatter");
111113
}
112114
brokerProps.setProperty("offsets.topic.replication.factor", "1");
@@ -116,9 +118,9 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
116118
kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
117119
kafkaServer.startup();
118120
kafkaServer.zkClient();
119-
adminZkClient = new AdminZkClient(kafkaServer.zkClient());
121+
adminZkClient = new AdminZkClient(kafkaServer.zkClient(), Option.empty());
120122
LOG.info("Creating kafka TOPIC [{}]", TOPIC);
121-
adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
123+
adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$, false);
122124
}
123125

124126
/**

kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import javax.annotation.Nullable;
4646
import java.nio.charset.Charset;
47+
import java.time.Duration;
4748
import java.util.Arrays;
4849
import java.util.Iterator;
4950
import java.util.List;
@@ -304,7 +305,7 @@ private static void sendData(List<ConsumerRecord<byte[], byte[]>> recordList, @N
304305
@After public void tearDown() {
305306
this.kafkaRecordIterator = null;
306307
if (this.consumer != null) {
307-
this.consumer.close();
308+
this.consumer.close(Duration.ZERO);
308309
}
309310
}
310311

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172
<junit.version>4.13.2</junit.version>
173173
<junit.jupiter.version>5.13.3</junit.jupiter.version>
174174
<junit.vintage.version>5.13.3</junit.vintage.version>
175-
<kafka.version>2.5.0</kafka.version>
175+
<kafka.version>3.9.1</kafka.version>
176176
<kryo.version>5.5.0</kryo.version>
177177
<reflectasm.version>1.11.9</reflectasm.version>
178178
<kudu.version>1.17.0</kudu.version>

0 commit comments

Comments
 (0)