@@ -39,6 +39,7 @@ class Fetcher(six.Iterator):
3939 'fetch_max_wait_ms' : 500 ,
4040 'max_partition_fetch_bytes' : 1048576 ,
4141 'check_crcs' : True ,
42+ 'skip_double_compressed_messages' : False ,
4243 'iterator_refetch_records' : 1 , # undocumented -- interface may change
4344 'api_version' : (0 , 8 , 0 ),
4445 }
@@ -71,6 +72,13 @@ def __init__(self, client, subscriptions, metrics, metric_group_prefix,
7172 consumed. This ensures no on-the-wire or on-disk corruption to
7273 the messages occurred. This check adds some overhead, so it may
7374 be disabled in cases seeking extreme performance. Default: True
75+ skip_double_compressed_messages (bool): A bug in KafkaProducer
76+ caused some messages to be corrupted via double-compression.
77+ By default, the fetcher will return the messages as a compressed
78+ blob of bytes with a single offset, i.e. how the message was
79+ actually published to the cluster. If you prefer to have the
80+ fetcher automatically detect corrupt messages and skip them,
81+ set this option to True. Default: False.
7482 """
7583 self .config = copy .copy (self .DEFAULT_CONFIG )
7684 for key in self .config :
@@ -352,33 +360,64 @@ def fetched_records(self):
352360 position )
353361 return dict (drained )
354362
355- def _unpack_message_set (self , tp , messages , relative_offset = 0 ):
363+ def _unpack_message_set (self , tp , messages ):
356364 try :
357365 for offset , size , msg in messages :
358366 if self .config ['check_crcs' ] and not msg .validate_crc ():
359367 raise Errors .InvalidMessageError (msg )
360368 elif msg .is_compressed ():
361- mset = msg .decompress ()
362- # new format uses relative offsets for compressed messages
369+ # If relative offset is used, we need to decompress the entire message first to compute
370+ # the absolute offset.
371+ inner_mset = msg .decompress ()
372+
373+ # There should only ever be a single layer of compression
374+ if inner_mset [0 ][- 1 ].is_compressed ():
375+ log .warning ('MessageSet at %s offset %d appears '
376+ ' double-compressed. This should not'
377+ ' happen -- check your producers!' ,
378+ tp , offset )
379+ if self .config ['skip_double_compressed_messages' ]:
380+ log .warning ('Skipping double-compressed message at'
381+ ' %s %d' , tp , offset )
382+ continue
383+
363384 if msg .magic > 0 :
364- last_offset , _ , _ = mset [- 1 ]
365- relative = offset - last_offset
385+ last_offset , _ , _ = inner_mset [- 1 ]
386+ absolute_base_offset = offset - last_offset
366387 else :
367- relative = 0
368- for record in self ._unpack_message_set (tp , mset , relative ):
369- yield record
388+ absolute_base_offset = - 1
389+
390+ for inner_offset , inner_size , inner_msg in inner_mset :
391+ if msg .magic > 0 :
392+ # When magic value is greater than 0, the timestamp
393+ # of a compressed message depends on the
394+ # typestamp type of the wrapper message:
395+
396+ if msg .timestamp_type == 0 : # CREATE_TIME (0)
397+ inner_timestamp = inner_msg .timestamp
398+
399+ elif msg .timestamp_type == 1 : # LOG_APPEND_TIME (1)
400+ inner_timestamp = msg .timestamp
401+
402+ else :
403+ raise ValueError ('Unknown timestamp type: {}' .format (msg .timestamp_type ))
404+ else :
405+ inner_timestamp = msg .timestamp
406+
407+ if absolute_base_offset >= 0 :
408+ inner_offset += absolute_base_offset
409+
410+ key , value = self ._deserialize (inner_msg )
411+ yield ConsumerRecord (tp .topic , tp .partition , inner_offset ,
412+ inner_timestamp , msg .timestamp_type ,
413+ key , value )
414+
370415 else :
371- # Message v1 adds timestamp
372- if msg .magic > 0 :
373- timestamp = msg .timestamp
374- timestamp_type = msg .timestamp_type
375- else :
376- timestamp = timestamp_type = None
377416 key , value = self ._deserialize (msg )
378- yield ConsumerRecord (tp .topic , tp .partition ,
379- offset + relative_offset ,
380- timestamp , timestamp_type ,
417+ yield ConsumerRecord (tp .topic , tp .partition , offset ,
418+ msg .timestamp , msg .timestamp_type ,
381419 key , value )
420+
382421 # If unpacking raises StopIteration, it is erroneously
383422 # caught by the generator. We want all exceptions to be raised
384423 # back to the user. See Issue 545
0 commit comments