Skip to content
98 changes: 81 additions & 17 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.fetch import FetchRequest, AbortedTransaction
from kafka.protocol.list_offsets import (
ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET
)
Expand All @@ -28,6 +28,11 @@
READ_UNCOMMITTED = 0
READ_COMMITTED = 1

ISOLATION_LEVEL_CONFIG = {
'read_uncommitted': READ_UNCOMMITTED,
'read_committed': READ_COMMITTED,
}

ConsumerRecord = collections.namedtuple("ConsumerRecord",
["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type",
"key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"])
Expand Down Expand Up @@ -60,6 +65,7 @@ class Fetcher(six.Iterator):
'metric_group_prefix': 'consumer',
'retry_backoff_ms': 100,
'enable_incremental_fetch_sessions': True,
'isolation_level': 'read_uncommitted',
}

def __init__(self, client, subscriptions, **configs):
Expand Down Expand Up @@ -100,12 +106,18 @@ def __init__(self, client, subscriptions, **configs):
consumed. This ensures no on-the-wire or on-disk corruption to
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
isolation_level (str): Configure KIP-98 transactional consumer by
setting to 'read_committed'. This will cause the consumer to
skip records from aborted tranactions. Default: 'read_uncommitted'
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
self.config[key] = configs[key]

if self.config['isolation_level'] not in ISOLATION_LEVEL_CONFIG:
raise Errors.KafkaConfigurationError('Unrecognized isolation_level')

self._client = client
self._subscriptions = subscriptions
self._completed_fetches = collections.deque() # Unparsed responses
Expand All @@ -116,7 +128,7 @@ def __init__(self, client, subscriptions, **configs):
self._sensors = FetchManagerMetrics(self.config['metrics'], self.config['metric_group_prefix'])
else:
self._sensors = None
self._isolation_level = READ_UNCOMMITTED
self._isolation_level = ISOLATION_LEVEL_CONFIG[self.config['isolation_level']]
self._session_handlers = {}
self._nodes_with_pending_fetch_requests = set()

Expand Down Expand Up @@ -244,7 +256,7 @@ def _reset_offset(self, partition, timeout_ms=None):
else:
raise NoOffsetForPartitionError(partition)

log.debug("Resetting offset for partition %s to %s offset.",
log.debug("Resetting offset for partition %s to offset %s.",
partition, strategy)
offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms=timeout_ms)

Expand Down Expand Up @@ -765,14 +777,21 @@ def _parse_fetched_data(self, completed_fetch):
return None

records = MemoryRecords(completed_fetch.partition_data[-1])
aborted_transactions = None
if completed_fetch.response_version >= 11:
aborted_transactions = completed_fetch.partition_data[-3]
elif completed_fetch.response_version >= 4:
aborted_transactions = completed_fetch.partition_data[-2]
log.debug("Preparing to read %s bytes of data for partition %s with offset %d",
records.size_in_bytes(), tp, fetch_offset)
parsed_records = self.PartitionRecords(fetch_offset, tp, records,
self.config['key_deserializer'],
self.config['value_deserializer'],
self.config['check_crcs'],
completed_fetch.metric_aggregator,
self._on_partition_records_drain)
key_deserializer=self.config['key_deserializer'],
value_deserializer=self.config['value_deserializer'],
check_crcs=self.config['check_crcs'],
isolation_level=self._isolation_level,
aborted_transactions=aborted_transactions,
metric_aggregator=completed_fetch.metric_aggregator,
on_drain=self._on_partition_records_drain)
if not records.has_next() and records.size_in_bytes() > 0:
if completed_fetch.response_version < 3:
# Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
Expand Down Expand Up @@ -845,13 +864,23 @@ def close(self):
self._next_partition_records.drain()

class PartitionRecords(object):
def __init__(self, fetch_offset, tp, records, key_deserializer, value_deserializer, check_crcs, metric_aggregator, on_drain):
def __init__(self, fetch_offset, tp, records,
key_deserializer=None, value_deserializer=None,
check_crcs=True, isolation_level=READ_UNCOMMITTED,
aborted_transactions=None, # raw data from response / list of (producer_id, first_offset) tuples
metric_aggregator=None, on_drain=lambda x: None):
self.fetch_offset = fetch_offset
self.topic_partition = tp
self.leader_epoch = -1
self.next_fetch_offset = fetch_offset
self.bytes_read = 0
self.records_read = 0
self.isolation_level = isolation_level
self.aborted_producer_ids = set()
self.aborted_transactions = collections.deque(
sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [],
key=lambda txn: txn.first_offset)
)
self.metric_aggregator = metric_aggregator
self.check_crcs = check_crcs
self.record_iterator = itertools.dropwhile(
Expand Down Expand Up @@ -900,18 +929,35 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
"Record batch for partition %s at offset %s failed crc check" % (
self.topic_partition, batch.base_offset))


# Try DefaultsRecordBatch / message log format v2
# base_offset, last_offset_delta, and control batches
# base_offset, last_offset_delta, aborted transactions, and control batches
if batch.magic == 2:
self.leader_epoch = batch.leader_epoch
if self.isolation_level == READ_COMMITTED and batch.has_producer_id():
# remove from the aborted transaction queue all aborted transactions which have begun
# before the current batch's last offset and add the associated producerIds to the
# aborted producer set
self._consume_aborted_transactions_up_to(batch.last_offset)

producer_id = batch.producer_id
if self._contains_abort_marker(batch):
try:
self.aborted_producer_ids.remove(producer_id)
except KeyError:
pass
elif self._is_batch_aborted(batch):
log.debug("Skipping aborted record batch from partition %s with producer_id %s and"
" offsets %s to %s",
self.topic_partition, producer_id, batch.base_offset, batch.last_offset)
self.next_fetch_offset = batch.next_offset
batch = records.next_batch()
continue

# Control batches have a single record indicating whether a transaction
# was aborted or committed.
# When isolation_level is READ_COMMITTED (currently unsupported)
# we should also skip all messages from aborted transactions
# For now we only support READ_UNCOMMITTED and so we ignore the
# abort/commit signal.
# was aborted or committed. These are not returned to the consumer.
if batch.is_control_batch:
self.next_fetch_offset = next(batch).offset + 1
self.next_fetch_offset = batch.next_offset
batch = records.next_batch()
continue

Expand Down Expand Up @@ -944,7 +990,7 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
# unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck
# fetching the same batch repeatedly).
if last_batch and last_batch.magic == 2:
self.next_fetch_offset = last_batch.base_offset + last_batch.last_offset_delta + 1
self.next_fetch_offset = last_batch.next_offset
self.drain()

# If unpacking raises StopIteration, it is erroneously
Expand All @@ -961,6 +1007,24 @@ def _deserialize(self, f, topic, bytes_):
return f.deserialize(topic, bytes_)
return f(bytes_)

def _consume_aborted_transactions_up_to(self, offset):
if not self.aborted_transactions:
return

while self.aborted_transactions and self.aborted_transactions[0].first_offset <= offset:
self.aborted_producer_ids.add(self.aborted_transactions.popleft().producer_id)

def _is_batch_aborted(self, batch):
return batch.is_transactional and batch.producer_id in self.aborted_producer_ids

def _contains_abort_marker(self, batch):
if not batch.is_control_batch:
return False
record = next(batch)
if not record:
return False
return record.abort


class FetchSessionHandler(object):
"""
Expand Down
4 changes: 4 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class KafkaConsumer(six.Iterator):
consumed. This ensures no on-the-wire or on-disk corruption to
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
isolation_level (str): Configure KIP-98 transactional consumer by
setting to 'read_committed'. This will cause the consumer to
skip records from aborted tranactions. Default: 'read_uncommitted'
allow_auto_create_topics (bool): Enable/disable auto topic creation
on metadata request. Only available with api_version >= (0, 11).
Default: True
Expand Down Expand Up @@ -290,6 +293,7 @@ class KafkaConsumer(six.Iterator):
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': lambda offsets, response: True,
'check_crcs': True,
'isolation_level': 'read_uncommitted',
'allow_auto_create_topics': True,
'metadata_max_age_ms': 5 * 60 * 1000,
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import

import abc
from collections import defaultdict, OrderedDict
from collections import OrderedDict
try:
from collections.abc import Sequence
except ImportError:
Expand Down
6 changes: 6 additions & 0 deletions kafka/protocol/fetch.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from __future__ import absolute_import

import collections

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String, Bytes


AbortedTransaction = collections.namedtuple("AbortedTransaction",
["producer_id", "first_offset"])


class FetchResponse_v0(Response):
API_KEY = 1
API_VERSION = 0
Expand Down
10 changes: 10 additions & 0 deletions kafka/record/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ def __iter__(self):
if needed.
"""

@abc.abstractproperty
def base_offset(self):
""" Return base offset for batch
"""

@abc.abstractproperty
def size_in_bytes(self):
""" Return size of batch in bytes (includes header overhead)
"""

@abc.abstractproperty
def magic(self):
""" Return magic value (0, 1, 2) for batch.
Expand Down
45 changes: 45 additions & 0 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ class DefaultRecordBase(object):

LOG_APPEND_TIME = 1
CREATE_TIME = 0
NO_PRODUCER_ID = -1
NO_SEQUENCE = -1
MAX_INT = 2147483647

def _assert_has_codec(self, compression_type):
if compression_type == self.CODEC_GZIP:
Expand Down Expand Up @@ -136,6 +139,10 @@ def __init__(self, buffer):
def base_offset(self):
return self._header_data[0]

@property
def size_in_bytes(self):
return self._header_data[1] + self.AFTER_LEN_OFFSET

@property
def leader_epoch(self):
return self._header_data[2]
Expand All @@ -156,6 +163,14 @@ def attributes(self):
def last_offset_delta(self):
return self._header_data[6]

@property
def last_offset(self):
return self.base_offset + self.last_offset_delta

@property
def next_offset(self):
return self.last_offset + 1

@property
def compression_type(self):
return self.attributes & self.CODEC_MASK
Expand All @@ -180,6 +195,36 @@ def first_timestamp(self):
def max_timestamp(self):
return self._header_data[8]

@property
def producer_id(self):
return self._header_data[9]

def has_producer_id(self):
return self.producer_id > self.NO_PRODUCER_ID

@property
def producer_epoch(self):
return self._header_data[10]

@property
def base_sequence(self):
return self._header_data[11]

@property
def last_sequence(self):
if self.base_sequence == self.NO_SEQUENCE:
return self.NO_SEQUENCE
return self._increment_sequence(self.base_sequence, self.last_offset_delta)

def _increment_sequence(self, base, increment):
if base > (self.MAX_INT - increment):
return increment - (self.MAX_INT - base) - 1
return base + increment

@property
def records_count(self):
return self._header_data[12]

def _maybe_uncompress(self):
if not self._decompressed:
compression_type = self.compression_type
Expand Down
11 changes: 10 additions & 1 deletion kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def _assert_has_codec(self, compression_type):

class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):

__slots__ = ("_buffer", "_magic", "_offset", "_crc", "_timestamp",
__slots__ = ("_buffer", "_magic", "_offset", "_length", "_crc", "_timestamp",
"_attributes", "_decompressed")

def __init__(self, buffer, magic):
Expand All @@ -141,11 +141,20 @@ def __init__(self, buffer, magic):
assert magic == magic_

self._offset = offset
self._length = length
self._crc = crc
self._timestamp = timestamp
self._attributes = attrs
self._decompressed = False

@property
def base_offset(self):
return self._offset

@property
def size_in_bytes(self):
return self._length + self.LOG_OVERHEAD

@property
def timestamp_type(self):
"""0 for CreateTime; 1 for LogAppendTime; None if unsupported.
Expand Down
Loading
Loading