diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 5d149d731e3..b0abd4e7a1b 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -161,7 +161,7 @@ static int in_kafka_collect(struct flb_input_instance *ins, ret = FLB_EVENT_ENCODER_SUCCESS; while (ret == FLB_EVENT_ENCODER_SUCCESS) { - rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1); + rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeout_ms); if (!rkm) { break; @@ -180,8 +180,12 @@ static int in_kafka_collect(struct flb_input_instance *ins, rd_kafka_message_destroy(rkm); - /* TO-DO: commit the record based on `ret` */ - rd_kafka_commit(ctx->kafka.rk, NULL, 0); + + if (!ctx->enable_auto_commit) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + rd_kafka_commit(ctx->kafka.rk, NULL, 0); + } + } /* Break from the loop when reaching the limit of polling if available */ if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED && @@ -222,6 +226,7 @@ static int in_kafka_init(struct flb_input_instance *ins, char errstr[512]; (void) data; char conf_val[16]; + size_t dsize; /* Allocate space for the configuration context */ ctx = flb_malloc(sizeof(struct flb_in_kafka_config)); @@ -243,6 +248,27 @@ static int in_kafka_init(struct flb_input_instance *ins, goto init_error; } + /* Set the kafka poll timeout depending on whether we run in our own + or in the main event thread. + a) run in main event thread: + -> minimize the delay we might create + b) run in our own thread: + -> optimize for throughput and relay on 'fetch.wait.max.ms' + which is set to 500 by default. we align our + timeout with what is set for 'fetch.wait.max.ms' */ + ctx->poll_timeout_ms = 1; + if (ins->is_threaded) { + ctx->poll_timeout_ms = 550; /* ensure kafa triggers timeout */ + + /* align our timeout with what was configured for fetch.wait.max.ms */ + dsize = sizeof(conf_val); + res = rd_kafka_conf_get(kafka_conf, "fetch.wait.max.ms", conf_val, &dsize); + if (res == RD_KAFKA_CONF_OK && dsize <= sizeof(conf_val)) { + /* add 50ms so kafka triggers timeout */ + ctx->poll_timeout_ms = atoi(conf_val) + 50; + } + } + if (ctx->buffer_max_size > 0) { ctx->polling_threshold = ctx->buffer_max_size; @@ -428,7 +454,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size), "Set the maximum size of chunk" }, - /* EOF */ + { + FLB_CONFIG_MAP_BOOL, "enable_auto_commit", FLB_IN_KAFKA_ENABLE_AUTO_COMMIT, + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit), + "Rely on kafka auto-commit and commit messages in batches" + }, {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index b56d9c66893..ccb48f48584 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -32,6 +32,8 @@ #define FLB_IN_KAFKA_DEFAULT_FORMAT "none" #define FLB_IN_KAFKA_UNLIMITED (size_t)-1 #define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M" +#define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false" +#define FLB_IN_KAFKA_POLL_TIMEOUT_MS "550" // same as kafka fetch.wait.max.ms + 10% enum { FLB_IN_KAFKA_FORMAT_NONE, @@ -48,6 +50,8 @@ struct flb_in_kafka_config { int coll_fd; size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; + bool enable_auto_commit; + int poll_timeout_ms; }; #endif