From 866594723e3424ef0201458fd94d45ae96712524 Mon Sep 17 00:00:00 2001 From: CoreidCC Date: Wed, 20 Nov 2024 17:13:16 +0100 Subject: [PATCH 1/4] in_kafka: add support to switch to auto-commit Polling every 1ms and committing each message individually results in rather pure performance in high volume Kafka clusters. Commiting in batches (relay on auto-commit of kafka) drastically improves performance. Signed-off-by: CoreidCC --- plugins/in_kafka/in_kafka.c | 12 ++++++++++-- plugins/in_kafka/in_kafka.h | 2 ++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 5d149d731e3..ec1007492b4 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -180,8 +180,11 @@ static int in_kafka_collect(struct flb_input_instance *ins, rd_kafka_message_destroy(rkm); - /* TO-DO: commit the record based on `ret` */ - rd_kafka_commit(ctx->kafka.rk, NULL, 0); + + if(!ctx->enable_auto_commit) { + /* TO-DO: commit the record based on `ret` */ + rd_kafka_commit(ctx->kafka.rk, NULL, 0); + } /* Break from the loop when reaching the limit of polling if available */ if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED && @@ -428,6 +431,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size), "Set the maximum size of chunk" }, + { + FLB_CONFIG_MAP_BOOL, "enable_auto_commit", FLB_IN_KAFKA_ENABLE_AUTO_COMMIT, + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit), + "Rely on kafka auto-commit and commit messages in batches" + }, /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index b56d9c66893..41328193b0e 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -32,6 +32,7 @@ #define FLB_IN_KAFKA_DEFAULT_FORMAT "none" #define FLB_IN_KAFKA_UNLIMITED (size_t)-1 #define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M" +#define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false" enum { FLB_IN_KAFKA_FORMAT_NONE, @@ -48,6 +49,7 @@ struct flb_in_kafka_config { int coll_fd; size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; + bool enable_auto_commit; }; #endif From ca9d0075b0a1618a03cfffe379f20cd51f89a574 Mon Sep 17 00:00:00 2001 From: CoreidCC Date: Wed, 20 Nov 2024 17:22:27 +0100 Subject: [PATCH 2/4] in_kafka: increase the poll-timeout if we run in own own thread having 1ms timeout might make sense if the input plugin is running in the main thread (not introducing delay for others). but if we run in our very own thread then we should not over- ride the fetch.wait.max.ms configuration value from the kafka-consumer. this in conjuntion with using autocommit again boosts the throuhput significantly. Signed-off-by: CoreidCC --- plugins/in_kafka/in_kafka.c | 17 ++++++++++++++++- plugins/in_kafka/in_kafka.h | 1 + 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index ec1007492b4..df1605af31c 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -161,7 +161,7 @@ static int in_kafka_collect(struct flb_input_instance *ins, ret = FLB_EVENT_ENCODER_SUCCESS; while (ret == FLB_EVENT_ENCODER_SUCCESS) { - rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1); + rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeount_ms); if (!rkm) { break; @@ -246,6 +246,21 @@ static int in_kafka_init(struct flb_input_instance *ins, goto init_error; } + /* Set the kafka poll timeout dependend on wether we run in our own + * or in the main event thread. + * a) run in main event thread: + * -> minimize the delay we might create + * b) run in our own thread: + * -> optimize for throuput and relay on 'fetch.wait.max.ms' + * which is set to 500 by default default. lets set it to + * twice that so that increasing fetch.wait.max.ms still + * has an effect. + */ + ctx->poll_timeount_ms = 1; + if(ins->is_threaded) { + ctx->poll_timeount_ms = 1000; + } + if (ctx->buffer_max_size > 0) { ctx->polling_threshold = ctx->buffer_max_size; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index 41328193b0e..7eca5f5341f 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -50,6 +50,7 @@ struct flb_in_kafka_config { size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; bool enable_auto_commit; + int poll_timeount_ms; }; #endif From cd5375f4241a18935b9b008f4f7ab66b9c584cc5 Mon Sep 17 00:00:00 2001 From: CoreidCC Date: Mon, 2 Dec 2024 19:08:39 +0100 Subject: [PATCH 3/4] in_kafka: make pull timeout configurable Signed-off-by: CoreidCC --- plugins/in_kafka/in_kafka.c | 24 ++++++++++++++++-------- plugins/in_kafka/in_kafka.h | 1 + 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index df1605af31c..395217cc378 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -182,8 +182,9 @@ static int in_kafka_collect(struct flb_input_instance *ins, if(!ctx->enable_auto_commit) { - /* TO-DO: commit the record based on `ret` */ - rd_kafka_commit(ctx->kafka.rk, NULL, 0); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + rd_kafka_commit(ctx->kafka.rk, NULL, 0); + } } /* Break from the loop when reaching the limit of polling if available */ @@ -225,6 +226,7 @@ static int in_kafka_init(struct flb_input_instance *ins, char errstr[512]; (void) data; char conf_val[16]; + size_t dsize; /* Allocate space for the configuration context */ ctx = flb_malloc(sizeof(struct flb_in_kafka_config)); @@ -252,13 +254,20 @@ static int in_kafka_init(struct flb_input_instance *ins, * -> minimize the delay we might create * b) run in our own thread: * -> optimize for throuput and relay on 'fetch.wait.max.ms' - * which is set to 500 by default default. lets set it to - * twice that so that increasing fetch.wait.max.ms still - * has an effect. + * which is set to 500 by default default. wa algin our + * timeout with what is set for 'fetch.wait.max.ms' */ ctx->poll_timeount_ms = 1; - if(ins->is_threaded) { - ctx->poll_timeount_ms = 1000; + if (ins->is_threaded) { + ctx->poll_timeount_ms = 550; // ensure kafa triggers timeout + + // align our timeout with what was configured for fetch.wait.max.ms + dsize = sizeof(conf_val); + res = rd_kafka_conf_get(kafka_conf, "fetch.wait.max.ms", conf_val, &dsize); + if (res == RD_KAFKA_CONF_OK && dsize <= sizeof(conf_val)) { + // add 50ms so kafa triggers timout + ctx->poll_timeount_ms = atoi(conf_val) + 50; + } } if (ctx->buffer_max_size > 0) { @@ -451,7 +460,6 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit), "Rely on kafka auto-commit and commit messages in batches" }, - /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index 7eca5f5341f..7a57e0ca68b 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -33,6 +33,7 @@ #define FLB_IN_KAFKA_UNLIMITED (size_t)-1 #define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M" #define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false" +#define FLB_IN_KAFKA_POLL_TIMEOUT_MS "550" // same as kafka fetch.wait.max.ms + 10% enum { FLB_IN_KAFKA_FORMAT_NONE, From 2721ce06af651ac19c9da1e0559f4b278a3bd121 Mon Sep 17 00:00:00 2001 From: CoreidCC Date: Thu, 9 Jan 2025 08:57:43 +0100 Subject: [PATCH 4/4] in_kafka: formatting adjustments and typos Signed-off-by: CoreidCC --- plugins/in_kafka/in_kafka.c | 31 +++++++++++++++---------------- plugins/in_kafka/in_kafka.h | 2 +- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 395217cc378..b0abd4e7a1b 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -161,7 +161,7 @@ static int in_kafka_collect(struct flb_input_instance *ins, ret = FLB_EVENT_ENCODER_SUCCESS; while (ret == FLB_EVENT_ENCODER_SUCCESS) { - rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeount_ms); + rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeout_ms); if (!rkm) { break; @@ -181,7 +181,7 @@ static int in_kafka_collect(struct flb_input_instance *ins, rd_kafka_message_destroy(rkm); - if(!ctx->enable_auto_commit) { + if (!ctx->enable_auto_commit) { if (ret == FLB_EVENT_ENCODER_SUCCESS) { rd_kafka_commit(ctx->kafka.rk, NULL, 0); } @@ -248,25 +248,24 @@ static int in_kafka_init(struct flb_input_instance *ins, goto init_error; } - /* Set the kafka poll timeout dependend on wether we run in our own - * or in the main event thread. - * a) run in main event thread: - * -> minimize the delay we might create - * b) run in our own thread: - * -> optimize for throuput and relay on 'fetch.wait.max.ms' - * which is set to 500 by default default. wa algin our - * timeout with what is set for 'fetch.wait.max.ms' - */ - ctx->poll_timeount_ms = 1; + /* Set the kafka poll timeout depending on whether we run in our own + or in the main event thread. + a) run in main event thread: + -> minimize the delay we might create + b) run in our own thread: + -> optimize for throughput and relay on 'fetch.wait.max.ms' + which is set to 500 by default. we align our + timeout with what is set for 'fetch.wait.max.ms' */ + ctx->poll_timeout_ms = 1; if (ins->is_threaded) { - ctx->poll_timeount_ms = 550; // ensure kafa triggers timeout + ctx->poll_timeout_ms = 550; /* ensure kafa triggers timeout */ - // align our timeout with what was configured for fetch.wait.max.ms + /* align our timeout with what was configured for fetch.wait.max.ms */ dsize = sizeof(conf_val); res = rd_kafka_conf_get(kafka_conf, "fetch.wait.max.ms", conf_val, &dsize); if (res == RD_KAFKA_CONF_OK && dsize <= sizeof(conf_val)) { - // add 50ms so kafa triggers timout - ctx->poll_timeount_ms = atoi(conf_val) + 50; + /* add 50ms so kafka triggers timeout */ + ctx->poll_timeout_ms = atoi(conf_val) + 50; } } diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index 7a57e0ca68b..ccb48f48584 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -51,7 +51,7 @@ struct flb_in_kafka_config { size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; bool enable_auto_commit; - int poll_timeount_ms; + int poll_timeout_ms; }; #endif