@@ -46,9 +46,9 @@ the same `group_id`.
46
46
Ideally you should have as many threads as the number of partitions for a perfect balance --
47
47
more threads than partitions means that some threads will be idle
48
48
49
- For more information see http ://kafka.apache.org/documentation.html#theconsumer
49
+ For more information see https ://kafka.apache.org/24 /documentation.html#theconsumer
50
50
51
- Kafka consumer configuration: http ://kafka.apache.org/documentation.html#consumerconfigs
51
+ Kafka consumer configuration: https ://kafka.apache.org/24 /documentation.html#consumerconfigs
52
52
53
53
==== Metadata fields
54
54
@@ -71,46 +71,48 @@ inserted into your original event, you'll have to use the `mutate` filter to man
71
71
72
72
This plugin supports these configuration options plus the <<plugins-{type}s-{plugin}-common-options>> described later.
73
73
74
- NOTE: Some of these options map to a Kafka option. See the https://kafka.apache.org/documentation for more details.
74
+ NOTE: Some of these options map to a Kafka option. Defaults usually reflect the Kafka default setting,
75
+ and might change if Kafka's consumer defaults change.
76
+ See the https://kafka.apache.org/24/documentation for more details.
75
77
76
78
[cols="<,<,<",options="header",]
77
79
|=======================================================================
78
80
|Setting |Input type|Required
79
- | <<plugins-{type}s-{plugin}-auto_commit_interval_ms>> |<<string,string >>|No
81
+ | <<plugins-{type}s-{plugin}-auto_commit_interval_ms>> |<<number,number >>|No
80
82
| <<plugins-{type}s-{plugin}-auto_offset_reset>> |<<string,string>>|No
81
83
| <<plugins-{type}s-{plugin}-bootstrap_servers>> |<<string,string>>|No
82
- | <<plugins-{type}s-{plugin}-check_crcs>> |<<string,string >>|No
84
+ | <<plugins-{type}s-{plugin}-check_crcs>> |<<boolean,boolean >>|No
83
85
| <<plugins-{type}s-{plugin}-client_id>> |<<string,string>>|No
84
86
| <<plugins-{type}s-{plugin}-client_rack>> |<<string,string>>|No
85
- | <<plugins-{type}s-{plugin}-connections_max_idle_ms>> |<<string,string >>|No
87
+ | <<plugins-{type}s-{plugin}-connections_max_idle_ms>> |<<number,number >>|No
86
88
| <<plugins-{type}s-{plugin}-consumer_threads>> |<<number,number>>|No
87
89
| <<plugins-{type}s-{plugin}-decorate_events>> |<<boolean,boolean>>|No
88
- | <<plugins-{type}s-{plugin}-enable_auto_commit>> |<<string,string >>|No
90
+ | <<plugins-{type}s-{plugin}-enable_auto_commit>> |<<boolean,boolean >>|No
89
91
| <<plugins-{type}s-{plugin}-exclude_internal_topics>> |<<string,string>>|No
90
- | <<plugins-{type}s-{plugin}-fetch_max_bytes>> |<<string,string >>|No
91
- | <<plugins-{type}s-{plugin}-fetch_max_wait_ms>> |<<string,string >>|No
92
- | <<plugins-{type}s-{plugin}-fetch_min_bytes>> |<<string,string >>|No
92
+ | <<plugins-{type}s-{plugin}-fetch_max_bytes>> |<<number,number >>|No
93
+ | <<plugins-{type}s-{plugin}-fetch_max_wait_ms>> |<<number,number >>|No
94
+ | <<plugins-{type}s-{plugin}-fetch_min_bytes>> |<<number,number >>|No
93
95
| <<plugins-{type}s-{plugin}-group_id>> |<<string,string>>|No
94
- | <<plugins-{type}s-{plugin}-heartbeat_interval_ms>> |<<string,string >>|No
96
+ | <<plugins-{type}s-{plugin}-heartbeat_interval_ms>> |<<number,number >>|No
95
97
| <<plugins-{type}s-{plugin}-jaas_path>> |a valid filesystem path|No
96
98
| <<plugins-{type}s-{plugin}-kerberos_config>> |a valid filesystem path|No
97
99
| <<plugins-{type}s-{plugin}-key_deserializer_class>> |<<string,string>>|No
98
- | <<plugins-{type}s-{plugin}-max_partition_fetch_bytes>> |<<string,string >>|No
99
- | <<plugins-{type}s-{plugin}-max_poll_interval_ms>> |<<string,string >>|No
100
- | <<plugins-{type}s-{plugin}-max_poll_records>> |<<string,string >>|No
101
- | <<plugins-{type}s-{plugin}-metadata_max_age_ms>> |<<string,string >>|No
100
+ | <<plugins-{type}s-{plugin}-max_partition_fetch_bytes>> |<<number,number >>|No
101
+ | <<plugins-{type}s-{plugin}-max_poll_interval_ms>> |<<number,number >>|No
102
+ | <<plugins-{type}s-{plugin}-max_poll_records>> |<<number,number >>|No
103
+ | <<plugins-{type}s-{plugin}-metadata_max_age_ms>> |<<number,number >>|No
102
104
| <<plugins-{type}s-{plugin}-partition_assignment_strategy>> |<<string,string>>|No
103
105
| <<plugins-{type}s-{plugin}-poll_timeout_ms>> |<<number,number>>|No
104
- | <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<string,string >>|No
105
- | <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<string,string >>|No
106
- | <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<string,string >>|No
107
- | <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<string,string >>|No
106
+ | <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<number,number >>|No
107
+ | <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<number,number >>|No
108
+ | <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<number,number >>|No
109
+ | <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number >>|No
108
110
| <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
109
111
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
110
112
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
111
113
| <<plugins-{type}s-{plugin}-security_protocol>> |<<string,string>>, one of `["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]`|No
112
- | <<plugins-{type}s-{plugin}-send_buffer_bytes>> |<<string,string >>|No
113
- | <<plugins-{type}s-{plugin}-session_timeout_ms>> |<<string,string >>|No
114
+ | <<plugins-{type}s-{plugin}-send_buffer_bytes>> |<<number,number >>|No
115
+ | <<plugins-{type}s-{plugin}-session_timeout_ms>> |<<number,number >>|No
114
116
| <<plugins-{type}s-{plugin}-ssl_endpoint_identification_algorithm>> |<<string,string>>|No
115
117
| <<plugins-{type}s-{plugin}-ssl_key_password>> |<<password,password>>|No
116
118
| <<plugins-{type}s-{plugin}-ssl_keystore_location>> |a valid filesystem path|No
@@ -132,8 +134,8 @@ input plugins.
132
134
[id="plugins-{type}s-{plugin}-auto_commit_interval_ms"]
133
135
===== `auto_commit_interval_ms`
134
136
135
- * Value type is <<string,string >>
136
- * Default value is `" 5000"`
137
+ * Value type is <<number,number >>
138
+ * Default value is `5000`.
137
139
138
140
The frequency in milliseconds that the consumer offsets are committed to Kafka.
139
141
@@ -165,12 +167,12 @@ case a server is down).
165
167
[id="plugins-{type}s-{plugin}-check_crcs"]
166
168
===== `check_crcs`
167
169
168
- * Value type is <<string,string >>
169
- * There is no default value for this setting.
170
+ * Value type is <<boolean,boolean >>
171
+ * Default value is `true`
170
172
171
- Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk
172
- corruption to the messages occurred. This check adds some overhead, so it may be
173
- disabled in cases seeking extreme performance.
173
+ Automatically check the CRC32 of the records consumed.
174
+ This ensures no on- the-wire or on-disk corruption to the messages occurred.
175
+ This check adds some overhead, so it may be disabled in cases seeking extreme performance.
174
176
175
177
[id="plugins-{type}s-{plugin}-client_id"]
176
178
===== `client_id`
@@ -198,8 +200,8 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+
198
200
[id="plugins-{type}s-{plugin}-connections_max_idle_ms"]
199
201
===== `connections_max_idle_ms`
200
202
201
- * Value type is <<string,string >>
202
- * There is no default value for this setting .
203
+ * Value type is <<number,number >>
204
+ * Default value is `540000` milliseconds (9 minutes) .
203
205
204
206
Close idle connections after the number of milliseconds specified by this config.
205
207
@@ -230,8 +232,8 @@ This will add a field named `kafka` to the logstash event containing the followi
230
232
[id="plugins-{type}s-{plugin}-enable_auto_commit"]
231
233
===== `enable_auto_commit`
232
234
233
- * Value type is <<string,string >>
234
- * Default value is `" true" `
235
+ * Value type is <<boolean,boolean >>
236
+ * Default value is `true`
235
237
236
238
This committed offset will be used when the process fails as the position from
237
239
which the consumption will begin.
@@ -252,8 +254,8 @@ If set to true the only way to receive records from an internal topic is subscri
252
254
[id="plugins-{type}s-{plugin}-fetch_max_bytes"]
253
255
===== `fetch_max_bytes`
254
256
255
- * Value type is <<string,string >>
256
- * There is no default value for this setting.
257
+ * Value type is <<number,number >>
258
+ * Default value is `52428800` (50MB)
257
259
258
260
The maximum amount of data the server should return for a fetch request. This is not an
259
261
absolute maximum, if the first message in the first non-empty partition of the fetch is larger
@@ -262,8 +264,8 @@ than this value, the message will still be returned to ensure that the consumer
262
264
[id="plugins-{type}s-{plugin}-fetch_max_wait_ms"]
263
265
===== `fetch_max_wait_ms`
264
266
265
- * Value type is <<string,string >>
266
- * There is no default value for this setting .
267
+ * Value type is <<number,number >>
268
+ * Default value is `500` milliseconds .
267
269
268
270
The maximum amount of time the server will block before answering the fetch request if
269
271
there isn't sufficient data to immediately satisfy `fetch_min_bytes`. This
@@ -272,7 +274,7 @@ should be less than or equal to the timeout used in `poll_timeout_ms`
272
274
[id="plugins-{type}s-{plugin}-fetch_min_bytes"]
273
275
===== `fetch_min_bytes`
274
276
275
- * Value type is <<string,string >>
277
+ * Value type is <<number,number >>
276
278
* There is no default value for this setting.
277
279
278
280
The minimum amount of data the server should return for a fetch request. If insufficient
@@ -292,8 +294,8 @@ Logstash instances with the same `group_id`
292
294
[id="plugins-{type}s-{plugin}-heartbeat_interval_ms"]
293
295
===== `heartbeat_interval_ms`
294
296
295
- * Value type is <<string,string >>
296
- * There is no default value for this setting .
297
+ * Value type is <<number,number >>
298
+ * Default value is `3000` milliseconds (3 seconds) .
297
299
298
300
The expected time between heartbeats to the consumer coordinator. Heartbeats are used to ensure
299
301
that the consumer's session stays active and to facilitate rebalancing when new
@@ -343,8 +345,8 @@ Java Class used to deserialize the record's key
343
345
[id="plugins-{type}s-{plugin}-max_partition_fetch_bytes"]
344
346
===== `max_partition_fetch_bytes`
345
347
346
- * Value type is <<string,string >>
347
- * There is no default value for this setting .
348
+ * Value type is <<number,number >>
349
+ * Default value is `1048576` (1MB) .
348
350
349
351
The maximum amount of data per-partition the server will return. The maximum total memory used for a
350
352
request will be `#partitions * max.partition.fetch.bytes`. This size must be at least
@@ -355,28 +357,28 @@ to fetch a large message on a certain partition.
355
357
[id="plugins-{type}s-{plugin}-max_poll_interval_ms"]
356
358
===== `max_poll_interval_ms`
357
359
358
- * Value type is <<string,string >>
359
- * There is no default value for this setting .
360
+ * Value type is <<number,number >>
361
+ * Default value is `300000` milliseconds (5 minutes) .
360
362
361
363
The maximum delay between invocations of poll() when using consumer group management. This places
362
364
an upper bound on the amount of time that the consumer can be idle before fetching more records.
363
365
If poll() is not called before expiration of this timeout, then the consumer is considered failed and
364
366
the group will rebalance in order to reassign the partitions to another member.
365
- The value of the configuration `request_timeout_ms` must always be larger than max_poll_interval_ms
367
+ The value of the configuration `request_timeout_ms` must always be larger than ` max_poll_interval_ms`. ???
366
368
367
369
[id="plugins-{type}s-{plugin}-max_poll_records"]
368
370
===== `max_poll_records`
369
371
370
- * Value type is <<string,string >>
371
- * There is no default value for this setting .
372
+ * Value type is <<number,number >>
373
+ * Default value is `500` .
372
374
373
375
The maximum number of records returned in a single call to poll().
374
376
375
377
[id="plugins-{type}s-{plugin}-metadata_max_age_ms"]
376
378
===== `metadata_max_age_ms`
377
379
378
- * Value type is <<string,string >>
379
- * There is no default value for this setting .
380
+ * Value type is <<number,number >>
381
+ * Default value is `300000` milliseconds (5 minutes) .
380
382
381
383
The period of time in milliseconds after which we force a refresh of metadata even if
382
384
we haven't seen any partition leadership changes to proactively discover any new brokers or partitions
@@ -402,23 +404,28 @@ implementations.
402
404
===== `poll_timeout_ms`
403
405
404
406
* Value type is <<number,number>>
405
- * Default value is `100`
407
+ * Default value is `100` milliseconds.
406
408
407
- Time kafka consumer will wait to receive new messages from topics
409
+ Time Kafka consumer will wait to receive new messages from topics.
410
+
411
+ After subscribing to a set of topics, the Kafka consumer automatically joins the group when polling.
412
+ The plugin poll-ing in a loop ensures consumer liveness.
413
+ Underneath the covers, Kafka client sends periodic heartbeats to the server.
414
+ The timeout specified the time to block waiting for input on each poll.
408
415
409
416
[id="plugins-{type}s-{plugin}-receive_buffer_bytes"]
410
417
===== `receive_buffer_bytes`
411
418
412
- * Value type is <<string,string >>
413
- * There is no default value for this setting .
419
+ * Value type is <<number,number >>
420
+ * Default value is `32768` (32KB) .
414
421
415
422
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
416
423
417
424
[id="plugins-{type}s-{plugin}-reconnect_backoff_ms"]
418
425
===== `reconnect_backoff_ms`
419
426
420
- * Value type is <<string,string >>
421
- * There is no default value for this setting .
427
+ * Value type is <<number,number >>
428
+ * Default value is `50` milliseconds .
422
429
423
430
The amount of time to wait before attempting to reconnect to a given host.
424
431
This avoids repeatedly connecting to a host in a tight loop.
@@ -427,8 +434,8 @@ This backoff applies to all requests sent by the consumer to the broker.
427
434
[id="plugins-{type}s-{plugin}-request_timeout_ms"]
428
435
===== `request_timeout_ms`
429
436
430
- * Value type is <<string,string >>
431
- * There is no default value for this setting .
437
+ * Value type is <<number,number >>
438
+ * Default value is `40000` milliseconds (40 seconds) .
432
439
433
440
The configuration controls the maximum amount of time the client will wait
434
441
for the response of a request. If the response is not received before the timeout
@@ -438,8 +445,8 @@ retries are exhausted.
438
445
[id="plugins-{type}s-{plugin}-retry_backoff_ms"]
439
446
===== `retry_backoff_ms`
440
447
441
- * Value type is <<string,string >>
442
- * There is no default value for this setting .
448
+ * Value type is <<number,number >>
449
+ * Default value is `100` milliseconds .
443
450
444
451
The amount of time to wait before attempting to retry a failed fetch request
445
452
to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.
@@ -492,16 +499,16 @@ Security protocol to use, which can be either of PLAINTEXT,SSL,SASL_PLAINTEXT,SA
492
499
[id="plugins-{type}s-{plugin}-send_buffer_bytes"]
493
500
===== `send_buffer_bytes`
494
501
495
- * Value type is <<string,string >>
496
- * There is no default value for this setting .
502
+ * Value type is <<number,number >>
503
+ * Default value is `131072` (128KB) .
497
504
498
505
The size of the TCP send buffer (SO_SNDBUF) to use when sending data
499
506
500
507
[id="plugins-{type}s-{plugin}-session_timeout_ms"]
501
508
===== `session_timeout_ms`
502
509
503
- * Value type is <<string,string >>
504
- * There is no default value for this setting .
510
+ * Value type is <<number,number >>
511
+ * Default value is `10000` milliseconds (10 seconds) .
505
512
506
513
The timeout after which, if the `poll_timeout_ms` is not invoked, the consumer is marked dead
507
514
and a rebalance operation is triggered for the group identified by `group_id`
@@ -561,7 +568,7 @@ The JKS truststore path to validate the Kafka broker's certificate.
561
568
* Value type is <<password,password>>
562
569
* There is no default value for this setting.
563
570
564
- The truststore password
571
+ The truststore password.
565
572
566
573
[id="plugins-{type}s-{plugin}-ssl_truststore_type"]
567
574
===== `ssl_truststore_type`
0 commit comments