diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 5d149d731e3..4d0aa4eb492 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -161,7 +161,25 @@ static int in_kafka_collect(struct flb_input_instance *ins, ret = FLB_EVENT_ENCODER_SUCCESS; while (ret == FLB_EVENT_ENCODER_SUCCESS) { - rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1); + /* Set the Kafka poll timeout based on execution mode: + * + * a) Running in the main event loop (non-threaded): + * - Use a minimal timeout to avoid blocking other inputs. + * + * b) Running in a dedicated thread: + * - Optimize for throughput by allowing Kafka's internal batching. + * - Align with 'fetch.wait.max.ms' (default: 500ms) to maximize batch efficiency. + * - Set timeout slightly higher than 'fetch.wait.max.ms' (e.g., 1.5x - 2x) to + * ensure it does not interfere with Kafka’s fetch behavior, while still + * keeping the consumer responsive. + */ + if (ctx->ins->flags & FLB_INPUT_THREADED) { + /* Threaded mode: Optimize for batch processing and efficiency */ + rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeout_ms); + } else { + /* Main event loop: Minimize delay for non-blocking execution */ + rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1); + } if (!rkm) { break; @@ -428,6 +446,14 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size), "Set the maximum size of chunk" }, + { + FLB_CONFIG_MAP_INT, "poll_timeout_ms", "1", + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, poll_timeout_ms), + "Set the timeout in milliseconds for Kafka consumer poll operations. " + "This option only takes effect when running in a dedicated thread (i.e., when 'threaded' is enabled). " + "Using a higher timeout (e.g., 1.5x - 2x 'rdkafka.fetch.wait.max.ms') " + "can improve efficiency by leveraging Kafka's batching mechanism." + }, /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index b56d9c66893..519b3df0683 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -48,6 +48,7 @@ struct flb_in_kafka_config { int coll_fd; size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; + int poll_timeout_ms; }; #endif