Skip to content

Commit a2b84cb

Browse files
timtebeekkares
andauthored
Feat: added input isolation_level for fine control of transactional messages (#44)
Co-authored-by: Karol Bucek <[email protected]>
1 parent 05ffbed commit a2b84cb

File tree

5 files changed

+23
-1
lines changed

5 files changed

+23
-1
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 10.4.0
2+
- added the input `isolation_level` to allow fine control of whether to return transactional messages [#44](https://github.com/logstash-plugins/logstash-integration-kafka/pull/44)
3+
14
## 10.3.0
25
- added the input and output `client_dns_lookup` parameter to allow control of how DNS requests are made
36

CONTRIBUTORS

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Contributors:
1212
* Kurt Hurtado (kurtado)
1313
* Ry Biesemeyer (yaauie)
1414
* Rob Cowart (robcowart)
15+
* Tim te Beek (timtebeek)
1516

1617
Note: If you've sent us patches, bug reports, or otherwise contributed to
1718
Logstash, and you aren't on the list above and want to be, please let us know

docs/input-kafka.asciidoc

+12
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ See the https://kafka.apache.org/24/documentation for more details.
9595
| <<plugins-{type}s-{plugin}-fetch_min_bytes>> |<<number,number>>|No
9696
| <<plugins-{type}s-{plugin}-group_id>> |<<string,string>>|No
9797
| <<plugins-{type}s-{plugin}-heartbeat_interval_ms>> |<<number,number>>|No
98+
| <<plugins-{type}s-{plugin}-isolation_level>> |<<string,string>>|No
9899
| <<plugins-{type}s-{plugin}-jaas_path>> |a valid filesystem path|No
99100
| <<plugins-{type}s-{plugin}-kerberos_config>> |a valid filesystem path|No
100101
| <<plugins-{type}s-{plugin}-key_deserializer_class>> |<<string,string>>|No
@@ -315,6 +316,17 @@ consumers join or leave the group. The value must be set lower than
315316
`session.timeout.ms`, but typically should be set no higher than 1/3 of that value.
316317
It can be adjusted even lower to control the expected time for normal rebalances.
317318

319+
[id="plugins-{type}s-{plugin}-isolation_level"]
320+
===== `isolation_level`
321+
322+
* Value type is <<string,string>>
323+
* Default value is `"read_uncommitted"`
324+
325+
Controls how to read messages written transactionally. If set to `read_committed`, polling messages will only return
326+
transactional messages which have been committed. If set to `read_uncommitted` (the default), polling messages will
327+
return all messages, even transactional messages which have been aborted. Non-transactional messages will be returned
328+
unconditionally in either mode.
329+
318330
[id="plugins-{type}s-{plugin}-jaas_path"]
319331
===== `jaas_path`
320332

lib/logstash/inputs/kafka.rb

+6
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
114114
# `session.timeout.ms`, but typically should be set no higher than 1/3 of that value.
115115
# It can be adjusted even lower to control the expected time for normal rebalances.
116116
config :heartbeat_interval_ms, :validate => :number, :default => 3000 # Kafka default
117+
# Controls how to read messages written transactionally. If set to read_committed, consumer.poll()
118+
# will only return transactional messages which have been committed. If set to read_uncommitted'
119+
# (the default), consumer.poll() will return all messages, even transactional messages which have
120+
# been aborted. Non-transactional messages will be returned unconditionally in either mode.
121+
config :isolation_level, :validate => ["read_uncommitted", "read_committed"], :default => "read_uncommitted" # Kafka default
117122
# Java Class used to deserialize the record's key
118123
config :key_deserializer_class, :validate => :string, :default => "org.apache.kafka.common.serialization.StringDeserializer"
119124
# The maximum delay between invocations of poll() when using consumer group management. This places
@@ -311,6 +316,7 @@ def create_consumer(client_id)
311316
props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes.to_s) unless fetch_min_bytes.nil?
312317
props.put(kafka::GROUP_ID_CONFIG, group_id)
313318
props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms.to_s) unless heartbeat_interval_ms.nil?
319+
props.put(kafka::ISOLATION_LEVEL_CONFIG, isolation_level)
314320
props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
315321
props.put(kafka::MAX_PARTITION_FETCH_BYTES_CONFIG, max_partition_fetch_bytes.to_s) unless max_partition_fetch_bytes.nil?
316322
props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records.to_s) unless max_poll_records.nil?

logstash-integration-kafka.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-integration-kafka'
3-
s.version = '10.3.0'
3+
s.version = '10.4.0'
44
s.licenses = ['Apache-2.0']
55
s.summary = "Integration with Kafka - input and output plugins"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+

0 commit comments

Comments
 (0)