@@ -161,7 +161,7 @@ static int in_kafka_collect(struct flb_input_instance *ins,
161161 ret = FLB_EVENT_ENCODER_SUCCESS ;
162162
163163 while (ret == FLB_EVENT_ENCODER_SUCCESS ) {
164- rkm = rd_kafka_consumer_poll (ctx -> kafka .rk , ctx -> poll_timeount_ms );
164+ rkm = rd_kafka_consumer_poll (ctx -> kafka .rk , ctx -> poll_timeout_ms );
165165
166166 if (!rkm ) {
167167 break ;
@@ -181,7 +181,7 @@ static int in_kafka_collect(struct flb_input_instance *ins,
181181 rd_kafka_message_destroy (rkm );
182182
183183
184- if (!ctx -> enable_auto_commit ) {
184+ if (!ctx -> enable_auto_commit ) {
185185 if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
186186 rd_kafka_commit (ctx -> kafka .rk , NULL , 0 );
187187 }
@@ -248,25 +248,24 @@ static int in_kafka_init(struct flb_input_instance *ins,
248248 goto init_error ;
249249 }
250250
251- /* Set the kafka poll timeout dependend on wether we run in our own
252- * or in the main event thread.
253- * a) run in main event thread:
254- * -> minimize the delay we might create
255- * b) run in our own thread:
256- * -> optimize for throuput and relay on 'fetch.wait.max.ms'
257- * which is set to 500 by default default. wa algin our
258- * timeout with what is set for 'fetch.wait.max.ms'
259- */
260- ctx -> poll_timeount_ms = 1 ;
251+ /* Set the kafka poll timeout depending on whether we run in our own
252+ or in the main event thread.
253+ a) run in main event thread:
254+ -> minimize the delay we might create
255+ b) run in our own thread:
256+ -> optimize for throughput and relay on 'fetch.wait.max.ms'
257+ which is set to 500 by default. we align our
258+ timeout with what is set for 'fetch.wait.max.ms' */
259+ ctx -> poll_timeout_ms = 1 ;
261260 if (ins -> is_threaded ) {
262- ctx -> poll_timeount_ms = 550 ; // ensure kafa triggers timeout
261+ ctx -> poll_timeout_ms = 550 ; /* ensure kafa triggers timeout */
263262
264- // align our timeout with what was configured for fetch.wait.max.ms
263+ /* align our timeout with what was configured for fetch.wait.max.ms */
265264 dsize = sizeof (conf_val );
266265 res = rd_kafka_conf_get (kafka_conf , "fetch.wait.max.ms" , conf_val , & dsize );
267266 if (res == RD_KAFKA_CONF_OK && dsize <= sizeof (conf_val )) {
268- // add 50ms so kafa triggers timout
269- ctx -> poll_timeount_ms = atoi (conf_val ) + 50 ;
267+ /* add 50ms so kafa triggers timeout */
268+ ctx -> poll_timeout_ms = atoi (conf_val ) + 50 ;
270269 }
271270 }
272271
0 commit comments