Skip to content

Commit 08878bc

Browse files
committed
in_kafka: add support to switch to auto-commit
Polling every 1ms and committing each message individually results in rather pure performance in high volume Kafka clusters. Commiting in batches (relay on auto-commit of kafka) drastically improves performance. Signed-off-by: CoreidCC <[email protected]>
1 parent 09214eb commit 08878bc

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

plugins/in_kafka/in_kafka.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,11 @@ static int in_kafka_collect(struct flb_input_instance *ins,
180180

181181
rd_kafka_message_destroy(rkm);
182182

183-
/* TO-DO: commit the record based on `ret` */
184-
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
183+
184+
if(!ctx->enable_auto_commit) {
185+
/* TO-DO: commit the record based on `ret` */
186+
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
187+
}
185188

186189
/* Break from the loop when reaching the limit of polling if available */
187190
if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED &&
@@ -428,6 +431,11 @@ static struct flb_config_map config_map[] = {
428431
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size),
429432
"Set the maximum size of chunk"
430433
},
434+
{
435+
FLB_CONFIG_MAP_BOOL, "enable_auto_commit", FLB_IN_KAFKA_ENABLE_AUTO_COMMIT,
436+
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit),
437+
"Rely on kafka auto-commit and commit messages in batches"
438+
},
431439
/* EOF */
432440
{0}
433441
};

plugins/in_kafka/in_kafka.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#define FLB_IN_KAFKA_DEFAULT_FORMAT "none"
3333
#define FLB_IN_KAFKA_UNLIMITED (size_t)-1
3434
#define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M"
35+
#define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false"
3536

3637
enum {
3738
FLB_IN_KAFKA_FORMAT_NONE,
@@ -48,6 +49,7 @@ struct flb_in_kafka_config {
4849
int coll_fd;
4950
size_t buffer_max_size; /* Maximum size of chunk allocation */
5051
size_t polling_threshold;
52+
bool enable_auto_commit;
5153
};
5254

5355
#endif

0 commit comments

Comments
 (0)