Skip to content

Commit 8e16158

Browse files
agavrahachikuji
authored andcommitted
KAFKA-8305; Support default partitions & replication factor in AdminClient#createTopic (KIP-464) (apache#6728)
This commit makes three changes: - Adds a constructor for NewTopic(String, Optional<Integer>, Optional<Short>) which allows users to specify Optional.empty() for numPartitions or replicationFactor in order to use the broker default. - Changes AdminManager to accept -1 as valid options for replication factor and numPartitions (resolving to broker defaults). - Makes --partitions and --replication-factor optional arguments when creating topics using kafka-topics.sh. - Adds a dependency on scalaJava8Compat library to make it simpler to convert Scala Option to Java Optional Reviewers: Ismael Juma <[email protected]>, Ryanne Dolan <[email protected]>, Jason Gustafson <[email protected]>
1 parent b6d9e15 commit 8e16158

File tree

18 files changed

+248
-78
lines changed

18 files changed

+248
-78
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,8 @@ project(':clients') {
939939
compile libs.lz4
940940
compile libs.snappy
941941
compile libs.slf4jApi
942+
compile libs.scalaJava8Compat
943+
942944
compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
943945
compileOnly libs.jacksonJDK8Datatypes
944946

clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kafka.clients.admin;
1919

20+
import java.util.Optional;
2021
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
2122
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
2223
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
@@ -31,16 +32,29 @@
3132
* A new topic to be created via {@link AdminClient#createTopics(Collection)}.
3233
*/
3334
public class NewTopic {
35+
36+
private static final int NO_PARTITIONS = -1;
37+
private static final short NO_REPLICATION_FACTOR = -1;
38+
3439
private final String name;
35-
private final int numPartitions;
36-
private final short replicationFactor;
40+
private final Optional<Integer> numPartitions;
41+
private final Optional<Short> replicationFactor;
3742
private final Map<Integer, List<Integer>> replicasAssignments;
3843
private Map<String, String> configs = null;
3944

4045
/**
4146
* A new topic with the specified replication factor and number of partitions.
4247
*/
4348
public NewTopic(String name, int numPartitions, short replicationFactor) {
49+
this(name, Optional.of(numPartitions), Optional.of(replicationFactor));
50+
}
51+
52+
/**
53+
* A new topic that optionally defaults {@code numPartitions} and {@code replicationFactor} to
54+
* the broker configurations for {@code num.partitions} and {@code default.replication.factor}
55+
* respectively.
56+
*/
57+
public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
4458
this.name = name;
4559
this.numPartitions = numPartitions;
4660
this.replicationFactor = replicationFactor;
@@ -56,8 +70,8 @@ public NewTopic(String name, int numPartitions, short replicationFactor) {
5670
*/
5771
public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments) {
5872
this.name = name;
59-
this.numPartitions = -1;
60-
this.replicationFactor = -1;
73+
this.numPartitions = Optional.empty();
74+
this.replicationFactor = Optional.empty();
6175
this.replicasAssignments = Collections.unmodifiableMap(replicasAssignments);
6276
}
6377

@@ -72,14 +86,14 @@ public String name() {
7286
* The number of partitions for the new topic or -1 if a replica assignment has been specified.
7387
*/
7488
public int numPartitions() {
75-
return numPartitions;
89+
return numPartitions.orElse(NO_PARTITIONS);
7690
}
7791

7892
/**
7993
* The replication factor for the new topic or -1 if a replica assignment has been specified.
8094
*/
8195
public short replicationFactor() {
82-
return replicationFactor;
96+
return replicationFactor.orElse(NO_REPLICATION_FACTOR);
8397
}
8498

8599
/**
@@ -111,8 +125,8 @@ public Map<String, String> configs() {
111125
CreatableTopic convertToCreatableTopic() {
112126
CreatableTopic creatableTopic = new CreatableTopic().
113127
setName(name).
114-
setNumPartitions(numPartitions).
115-
setReplicationFactor(replicationFactor);
128+
setNumPartitions(numPartitions.orElse(NO_PARTITIONS)).
129+
setReplicationFactor(replicationFactor.orElse(NO_REPLICATION_FACTOR));
116130
if (replicasAssignments != null) {
117131
for (Entry<Integer, List<Integer>> entry : replicasAssignments.entrySet()) {
118132
creatableTopic.assignments().add(
@@ -136,8 +150,8 @@ CreatableTopic convertToCreatableTopic() {
136150
public String toString() {
137151
StringBuilder bld = new StringBuilder();
138152
bld.append("(name=").append(name).
139-
append(", numPartitions=").append(numPartitions).
140-
append(", replicationFactor=").append(replicationFactor).
153+
append(", numPartitions=").append(numPartitions.map(String::valueOf).orElse("default")).
154+
append(", replicationFactor=").append(replicationFactor.map(String::valueOf).orElse("default")).
141155
append(", replicasAssignments=").append(replicasAssignments).
142156
append(", configs=").append(configs).
143157
append(")");

clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
*/
1717
package org.apache.kafka.common.requests;
1818

19+
import java.nio.ByteBuffer;
20+
import java.util.List;
21+
import java.util.stream.Collectors;
1922
import org.apache.kafka.common.errors.UnsupportedVersionException;
2023
import org.apache.kafka.common.message.CreateTopicsRequestData;
2124
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
@@ -24,8 +27,6 @@
2427
import org.apache.kafka.common.protocol.ApiKeys;
2528
import org.apache.kafka.common.protocol.types.Struct;
2629

27-
import java.nio.ByteBuffer;
28-
2930
public class CreateTopicsRequest extends AbstractRequest {
3031
public static class Builder extends AbstractRequest.Builder<CreateTopicsRequest> {
3132
private final CreateTopicsRequestData data;
@@ -40,6 +41,23 @@ public CreateTopicsRequest build(short version) {
4041
if (data.validateOnly() && version == 0)
4142
throw new UnsupportedVersionException("validateOnly is not supported in version 0 of " +
4243
"CreateTopicsRequest");
44+
45+
final List<String> topicsWithDefaults = data.topics()
46+
.stream()
47+
.filter(topic -> topic.assignments().isEmpty())
48+
.filter(topic ->
49+
topic.numPartitions() == CreateTopicsRequest.NO_NUM_PARTITIONS
50+
|| topic.replicationFactor() == CreateTopicsRequest.NO_REPLICATION_FACTOR)
51+
.map(CreatableTopic::name)
52+
.collect(Collectors.toList());
53+
54+
if (!topicsWithDefaults.isEmpty() && version < 4) {
55+
throw new UnsupportedVersionException("Creating topics with default "
56+
+ "partitions/replication factor are only supported in CreateTopicRequest "
57+
+ "version 4+. The following topics need values for partitions and replicas: "
58+
+ topicsWithDefaults);
59+
}
60+
4361
return new CreateTopicsRequest(data, version);
4462
}
4563

clients/src/main/resources/common/message/CreateTopicsRequest.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@
1818
"type": "request",
1919
"name": "CreateTopicsRequest",
2020
// Version 1 adds validateOnly.
21-
"validVersions": "0-3",
21+
// Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464)
22+
"validVersions": "0-4",
2223
"fields": [
2324
{ "name": "Topics", "type": "[]CreatableTopic", "versions": "0+",
2425
"about": "The topics to create.", "fields": [
2526
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
2627
"about": "The topic name." },
2728
{ "name": "NumPartitions", "type": "int32", "versions": "0+",
28-
"about": "The number of partitions to create in the topic, or -1 if we are specifying a manual partition assignment." },
29+
"about": "The number of partitions to create in the topic, or -1 if we are either specifying a manual partition assignment or using the default partitions." },
2930
{ "name": "ReplicationFactor", "type": "int16", "versions": "0+",
30-
"about": "The number of replicas to create for each partition in the topic, or -1 if we are specifying a manual partition assignment." },
31+
"about": "The number of replicas to create for each partition in the topic, or -1 if we are either specifying a manual partition assignment or using the default replication factor." },
3132
{ "name": "Assignments", "type": "[]CreatableReplicaAssignment", "versions": "0+",
3233
"about": "The manual partition assignment, or the empty array if we are using automatic assignment.", "fields": [
3334
{ "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,

clients/src/main/resources/common/message/CreateTopicsResponse.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
// Version 1 adds a per-topic error message string.
2121
// Version 2 adds the throttle time.
2222
// Starting in version 3, on quota violation, brokers send out responses before throttling.
23-
"validVersions": "0-3",
23+
// Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464)
24+
"validVersions": "0-4",
2425
"fields": [
2526
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
2627
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
8383
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
8484
import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
85+
import org.apache.kafka.common.requests.CreateTopicsRequest.Builder;
8586
import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
8687
import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
8788
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
@@ -120,6 +121,7 @@
120121
import static org.apache.kafka.test.TestUtils.toBuffer;
121122
import static org.junit.Assert.assertEquals;
122123
import static org.junit.Assert.assertFalse;
124+
import static org.junit.Assert.assertThrows;
123125
import static org.junit.Assert.assertTrue;
124126
import static org.junit.Assert.fail;
125127

@@ -644,6 +646,28 @@ public void testCreateTopicRequestV0FailsIfValidateOnly() {
644646
createCreateTopicRequest(0, true);
645647
}
646648

649+
@Test
650+
public void testCreateTopicRequestV3FailsIfNoPartitionsOrReplicas() {
651+
final UnsupportedVersionException exception = assertThrows(
652+
UnsupportedVersionException.class, () -> {
653+
CreateTopicsRequestData data = new CreateTopicsRequestData()
654+
.setTimeoutMs(123)
655+
.setValidateOnly(false);
656+
data.topics().add(new CreatableTopic().
657+
setName("foo").
658+
setNumPartitions(CreateTopicsRequest.NO_NUM_PARTITIONS).
659+
setReplicationFactor((short) 1));
660+
data.topics().add(new CreatableTopic().
661+
setName("bar").
662+
setNumPartitions(1).
663+
setReplicationFactor(CreateTopicsRequest.NO_REPLICATION_FACTOR));
664+
665+
new Builder(data).build((short) 3);
666+
});
667+
assertTrue(exception.getMessage().contains("supported in CreateTopicRequest version 4+"));
668+
assertTrue(exception.getMessage().contains("[foo, bar]"));
669+
}
670+
647671
@Test
648672
public void testFetchRequestMaxBytesOldVersions() throws Exception {
649673
final short version = 1;

core/src/main/scala/kafka/admin/TopicCommand.scala

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
3030
import org.apache.kafka.clients.CommonClientConfigs
3131
import org.apache.kafka.clients.admin.{ListTopicsOptions, NewPartitions, NewTopic, AdminClient => JAdminClient}
3232
import org.apache.kafka.common.TopicPartition
33-
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
3433
import org.apache.kafka.common.config.ConfigResource.Type
34+
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
3535
import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
3636
import org.apache.kafka.common.internals.Topic
3737
import org.apache.kafka.common.security.JaasUtils
@@ -40,6 +40,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException
4040

4141
import scala.collection.JavaConverters._
4242
import scala.collection._
43+
import scala.compat.java8.OptionConverters._
4344
import scala.io.StdIn
4445

4546
object TopicCommand extends Logging {
@@ -82,7 +83,7 @@ object TopicCommand extends Logging {
8283
class CommandTopicPartition(opts: TopicCommandOptions) {
8384
val name: String = opts.topic.get
8485
val partitions: Option[Integer] = opts.partitions
85-
val replicationFactor: Integer = opts.replicationFactor.getOrElse(-1)
86+
val replicationFactor: Option[Integer] = opts.replicationFactor
8687
val replicaAssignment: Option[Map[Int, List[Int]]] = opts.replicaAssignment
8788
val configsToAdd: Properties = parseTopicConfigsToBeAdded(opts)
8889
val configsToDelete: Seq[String] = parseTopicConfigsToBeDeleted(opts)
@@ -172,14 +173,21 @@ object TopicCommand extends Logging {
172173
case class AdminClientTopicService private (adminClient: JAdminClient) extends TopicService {
173174

174175
override def createTopic(topic: CommandTopicPartition): Unit = {
175-
if (topic.replicationFactor > Short.MaxValue)
176-
throw new IllegalArgumentException(s"The replication factor's maximum value must be smaller or equal to ${Short.MaxValue}")
176+
if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
177+
throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")
178+
if (topic.partitions.exists(partitions => partitions < 1))
179+
throw new IllegalArgumentException(s"The partitions must be greater than 0")
177180

178181
if (!adminClient.listTopics().names().get().contains(topic.name)) {
179182
val newTopic = if (topic.hasReplicaAssignment)
180183
new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
181-
else
182-
new NewTopic(topic.name, topic.partitions.get, topic.replicationFactor.shortValue())
184+
else {
185+
new NewTopic(
186+
topic.name,
187+
topic.partitions.asJava,
188+
topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
189+
}
190+
183191
val configsMap = topic.configsToAdd.stringPropertyNames()
184192
.asScala
185193
.map(name => name -> topic.configsToAdd.getProperty(name))
@@ -289,7 +297,7 @@ object TopicCommand extends Logging {
289297
if (topic.hasReplicaAssignment)
290298
adminZkClient.createTopicWithAssignment(topic.name, topic.configsToAdd, topic.replicaAssignment.get)
291299
else
292-
adminZkClient.createTopic(topic.name, topic.partitions.get, topic.replicationFactor, topic.configsToAdd, topic.rackAwareMode)
300+
adminZkClient.createTopic(topic.name, topic.partitions.get, topic.replicationFactor.get, topic.configsToAdd, topic.rackAwareMode)
293301
println(s"Created topic ${topic.name}.")
294302
} catch {
295303
case e: TopicExistsException => if (!topic.ifTopicDoesntExist()) throw e
@@ -538,11 +546,11 @@ object TopicCommand extends Logging {
538546
.describedAs("name")
539547
.ofType(classOf[String])
540548
private val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
541-
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
549+
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default.")
542550
.withRequiredArg
543551
.describedAs("# of partitions")
544552
.ofType(classOf[java.lang.Integer])
545-
private val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.")
553+
private val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.")
546554
.withRequiredArg
547555
.describedAs("replication factor")
548556
.ofType(classOf[java.lang.Integer])
@@ -633,7 +641,7 @@ object TopicCommand extends Logging {
633641
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
634642
if (!has(listOpt) && !has(describeOpt))
635643
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
636-
if (has(createOpt) && !has(replicaAssignmentOpt))
644+
if (has(createOpt) && !has(replicaAssignmentOpt) && has(zkConnectOpt))
637645
CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt, replicationFactorOpt)
638646
if (has(bootstrapServerOpt) && has(alterOpt)) {
639647
CommandLineUtils.checkInvalidArgsSet(parser, options, Set(bootstrapServerOpt, configOpt), Set(alterOpt))

core/src/main/scala/kafka/server/AdminManager.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ class AdminManager(val config: KafkaConfig,
6262

6363
def hasDelayedTopicOperations = topicPurgatory.numDelayed != 0
6464

65+
private val defaultNumPartitions = config.numPartitions.intValue()
66+
private val defaultReplicationFactor = config.defaultReplicationFactor.shortValue()
67+
6568
/**
6669
* Try to complete delayed topic operations with the request key
6770
*/
@@ -95,8 +98,15 @@ class AdminManager(val config: KafkaConfig,
9598
throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
9699
"Both cannot be used at the same time.")
97100
}
101+
102+
val resolvedNumPartitions = if (topic.numPartitions == NO_NUM_PARTITIONS)
103+
defaultNumPartitions else topic.numPartitions
104+
val resolvedReplicationFactor = if (topic.replicationFactor == NO_REPLICATION_FACTOR)
105+
defaultReplicationFactor else topic.replicationFactor
106+
98107
val assignments = if (topic.assignments().isEmpty) {
99-
AdminUtils.assignReplicasToBrokers(brokers, topic.numPartitions, topic.replicationFactor)
108+
AdminUtils.assignReplicasToBrokers(
109+
brokers, resolvedNumPartitions, resolvedReplicationFactor)
100110
} else {
101111
val assignments = new mutable.HashMap[Int, Seq[Int]]
102112
// Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
@@ -115,9 +125,9 @@ class AdminManager(val config: KafkaConfig,
115125

116126
// Use `null` for unset fields in the public API
117127
val numPartitions: java.lang.Integer =
118-
if (topic.numPartitions == NO_NUM_PARTITIONS) null else topic.numPartitions
128+
if (topic.assignments().isEmpty) resolvedNumPartitions else null
119129
val replicationFactor: java.lang.Short =
120-
if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else topic.replicationFactor
130+
if (topic.assignments().isEmpty) resolvedReplicationFactor else null
121131
val javaAssignments = if (topic.assignments().isEmpty) {
122132
null
123133
} else {

0 commit comments

Comments
 (0)