Skip to content

Commit aac4b0b

Browse files
committed
sort aborted_transactions by first_offset
1 parent 4f932ca commit aac4b0b

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

kafka/consumer/fetcher.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -867,7 +867,7 @@ class PartitionRecords(object):
867867
def __init__(self, fetch_offset, tp, records,
868868
key_deserializer=None, value_deserializer=None,
869869
check_crcs=True, isolation_level=READ_UNCOMMITTED,
870-
aborted_transactions=None, # list of (producer_id, first_offset) tuples
870+
aborted_transactions=None, # raw data from response / list of (producer_id, first_offset) tuples
871871
metric_aggregator=None, on_drain=lambda x: None):
872872
self.fetch_offset = fetch_offset
873873
self.topic_partition = tp
@@ -878,7 +878,8 @@ def __init__(self, fetch_offset, tp, records,
878878
self.isolation_level = isolation_level
879879
self.aborted_producer_ids = set()
880880
self.aborted_transactions = collections.deque(
881-
[AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else []
881+
sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [],
882+
key=lambda txn: txn.first_offset)
882883
)
883884
self.metric_aggregator = metric_aggregator
884885
self.check_crcs = check_crcs

0 commit comments

Comments
 (0)