Description
It is not something obvious in librdkafka but if we set topic config to default values, config become modified.
That may cause unexpected behaviour.
For example, I use the following config (dump):
Config for topic is
auto.commit.enable: true
auto.commit.interval.ms: 60000
auto.offset.reset: largest
compression.codec: inherit
compression.level: -1
consume.callback.max.messages: 0
message.timeout.ms: 300000
offset.store.method: broker
offset.store.path: .
offset.store.sync.interval.ms: -1
partitioner: consistent_random
produce.offset.report: false
queuing.strategy: fifo
request.required.acks: -1
request.timeout.ms: 30000
After the code in RDKafkaTopicConfig:
try topicConfig.dictionary.forEach { key, value in
try Self.set(configPointer: configPointer, key: key, value: value)
}
it is not changed:
After Config for topic is
auto.commit.enable: true
auto.commit.interval.ms: 60000
auto.offset.reset: largest
compression.codec: inherit
compression.level: -1
consume.callback.max.messages: 0
message.timeout.ms: 300000
offset.store.method: broker
offset.store.path: .
offset.store.sync.interval.ms: -1
partitioner: consistent_random
produce.offset.report: false
queuing.strategy: fifo
request.required.acks: -1
request.timeout.ms: 30000
But since these values are modified, I have the following error:
error: -[IntegrationTests.SwiftKafkaTests testProduceAndConsumeWithTransaction] : failed: caught error: "KafkaError.rdKafkaError: Local: Invalid argument or configuration SwiftKafka/RDKafkaTopicHandles.swift:76
For experiment, I've commented the code above and everything works.
Just some more information. The reason for that is hidden in how librdkafka works. It has some 'modified' flags inside that changes even if we set values to default, while remain unchanged when config is untouched:
rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void) {
rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf));
rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*tconf) &&
*"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
rd_kafka_defaultconf_set(_RK_TOPIC, tconf);
rd_kafka_anyconf_clear_all_is_modified(tconf);
return tconf;
}
const char *rd_kafka_topic_conf_finalize(rd_kafka_type_t cltype,
const rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf) {
...
if (rd_kafka_topic_conf_is_modified(tconf, "acks")) {
...
if (rd_kafka_topic_conf_is_modified(tconf,
"queuing.strategy")) {
...
if (conf->eos.transactional_id) {
if (!rd_kafka_topic_conf_is_modified(
tconf, "message.timeout.ms"))
...
Furthermore, topic config supports nullptr
(aka swift nil
) value that creates default topic configuration.
I see several options here for gsoc interface (though, might be more):
- Add possibility to provide nil topic configuration -> convert to null pointer in librdkafka
- Make values optional in topic configuration and set only changed values
- Check values in topic configuration: if they equal to provided -> don't set them