@@ -39,6 +39,7 @@ class Fetcher(six.Iterator):
3939 'fetch_max_wait_ms' : 500 ,
4040 'max_partition_fetch_bytes' : 1048576 ,
4141 'check_crcs' : True ,
42+ 'skip_double_compressed_messages' : False ,
4243 'iterator_refetch_records' : 1 , # undocumented -- interface may change
4344 'api_version' : (0 , 8 , 0 ),
4445 }
@@ -71,6 +72,13 @@ def __init__(self, client, subscriptions, metrics, metric_group_prefix,
7172 consumed. This ensures no on-the-wire or on-disk corruption to
7273 the messages occurred. This check adds some overhead, so it may
7374 be disabled in cases seeking extreme performance. Default: True
75+ skip_double_compressed_messages (bool): A bug in KafkaProducer
76+ caused some messages to be corrupted via double-compression.
77+ By default, the fetcher will return the messages as a compressed
78+ blob of bytes with a single offset, i.e. how the message was
79+ actually published to the cluster. If you prefer to have the
80+ fetcher automatically detect corrupt messages and skip them,
81+ set this option to True. Default: False.
7482 """
7583 self .config = copy .copy (self .DEFAULT_CONFIG )
7684 for key in self .config :
@@ -368,6 +376,10 @@ def _unpack_message_set(self, tp, messages):
368376 ' double-compressed. This should not'
369377 ' happen -- check your producers!' ,
370378 tp , offset )
379+ if self .config ['skip_double_compressed_messages' ]:
380+ log .warning ('Skipping double-compressed message at'
381+ ' %s %d' , tp , offset )
382+ continue
371383
372384 if msg .magic > 0 :
373385 last_offset , _ , _ = inner_mset [- 1 ]
0 commit comments