1010
1111from kafka .admin .acl_resource import ACLOperation , ACLPermissionType , ACLFilter , ACL , ResourcePattern , ResourceType , \
1212 ACLResourcePatternType
13+ from kafka .admin .leader_election_resources import ElectionType
1314from kafka .client_async import KafkaClient , selectors
1415from kafka .coordinator .protocol import ConsumerProtocolMemberMetadata , ConsumerProtocolMemberAssignment , ConsumerProtocol
1516import kafka .errors as Errors
2021from kafka .protocol .admin import (
2122 CreateTopicsRequest , DeleteTopicsRequest , DescribeConfigsRequest , AlterConfigsRequest , CreatePartitionsRequest ,
2223 ListGroupsRequest , DescribeGroupsRequest , DescribeAclsRequest , CreateAclsRequest , DeleteAclsRequest ,
23- DeleteGroupsRequest
24+ DeleteGroupsRequest , ElectLeadersRequest
2425)
2526from kafka .protocol .commit import GroupCoordinatorRequest , OffsetFetchRequest
2627from kafka .protocol .metadata import MetadataRequest
@@ -393,27 +394,55 @@ def _send_request_to_controller(self, request):
393394 # So this is a little brittle in that it assumes all responses have
394395 # one of these attributes and that they always unpack into
395396 # (topic, error_code) tuples.
396- topic_error_tuples = (response .topic_errors if hasattr (response , 'topic_errors' )
397- else response .topic_error_codes )
398- # Also small py2/py3 compatibility -- py3 can ignore extra values
399- # during unpack via: for x, y, *rest in list_of_values. py2 cannot.
400- # So for now we have to map across the list and explicitly drop any
401- # extra values (usually the error_message)
402- for topic , error_code in map (lambda e : e [:2 ], topic_error_tuples ):
397+ topic_error_tuples = getattr (response , 'topic_errors' , getattr (response , 'topic_error_codes' , None ))
398+ if topic_error_tuples is not None :
399+ success = self ._parse_topic_request_response (topic_error_tuples , request , response , tries )
400+ else :
401+ # Leader Election request has a two layer error response (topic and partition)
402+ success = self ._parse_topic_partition_request_response (request , response , tries )
403+
404+ if success :
405+ return response
406+ raise RuntimeError ("This should never happen, please file a bug with full stacktrace if encountered" )
407+
408+ def _parse_topic_request_response (self , topic_error_tuples , request , response , tries ):
409+ # Also small py2/py3 compatibility -- py3 can ignore extra values
410+ # during unpack via: for x, y, *rest in list_of_values. py2 cannot.
411+ # So for now we have to map across the list and explicitly drop any
412+ # extra values (usually the error_message)
413+ for topic , error_code in map (lambda e : e [:2 ], topic_error_tuples ):
414+ error_type = Errors .for_code (error_code )
415+ if tries and error_type is NotControllerError :
416+ # No need to inspect the rest of the errors for
417+ # non-retriable errors because NotControllerError should
418+ # either be thrown for all errors or no errors.
419+ self ._refresh_controller_id ()
420+ return False
421+ elif error_type is not Errors .NoError :
422+ raise error_type (
423+ "Request '{}' failed with response '{}'."
424+ .format (request , response ))
425+ return True
426+
427+ def _parse_topic_partition_request_response (self , request , response , tries ):
428+ # Also small py2/py3 compatibility -- py3 can ignore extra values
429+ # during unpack via: for x, y, *rest in list_of_values. py2 cannot.
430+ # So for now we have to map across the list and explicitly drop any
431+ # extra values (usually the error_message)
432+ for topic , partition_results in response .replication_election_results :
433+ for partition_id , error_code in map (lambda e : e [:2 ], partition_results ):
403434 error_type = Errors .for_code (error_code )
404435 if tries and error_type is NotControllerError :
405436 # No need to inspect the rest of the errors for
406437 # non-retriable errors because NotControllerError should
407438 # either be thrown for all errors or no errors.
408439 self ._refresh_controller_id ()
409- break
410- elif error_type is not Errors .NoError :
440+ return False
441+ elif error_type not in [ Errors .NoError , Errors . ElectionNotNeeded ] :
411442 raise error_type (
412443 "Request '{}' failed with response '{}'."
413444 .format (request , response ))
414- else :
415- return response
416- raise RuntimeError ("This should never happen, please file a bug with full stacktrace if encountered" )
445+ return True
417446
418447 @staticmethod
419448 def _convert_new_topic_request (new_topic ):
@@ -1337,10 +1366,60 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
13371366 .format (version ))
13381367 return self ._send_request_to_node (group_coordinator_id , request )
13391368
1369+ @staticmethod
1370+ def _convert_topic_partitions (topic_partitions ):
1371+ return [
1372+ (
1373+ topic ,
1374+ partition_ids
1375+ )
1376+ for topic , partition_ids in topic_partitions .items ()
1377+ ]
1378+
1379+ def _get_all_topic_partitions (self ):
1380+ return [
1381+ (
1382+ topic ,
1383+ [partition_info .partition for partition_info in self ._client .cluster ._partitions [topic ].values ()]
1384+ )
1385+ for topic in self ._client .cluster .topics ()
1386+ ]
1387+
1388+ def _get_topic_partitions (self , topic_partitions ):
1389+ if topic_partitions is None :
1390+ return self ._get_all_topic_partitions ()
1391+ return self ._convert_topic_partitions (topic_partitions )
1392+
1393+ def perform_leader_election (self , election_type , topic_partitions = None , timeout_ms = None ):
1394+ """Perform leader election on the topic partitions.
1395+
1396+ :param election_type: Type of election to attempt. 0 for Perferred, 1 for Unclean
1397+ :param topic_partitions: A map of topic name strings to partition ids list.
1398+ By default, will run on all topic partitions
1399+ :param timeout_ms: Milliseconds to wait for the leader election process to complete
1400+ before the broker returns.
1401+
1402+ :return: Appropriate version of ElectLeadersResponse class.
1403+ """
1404+ version = self ._matching_api_version (ElectLeadersRequest )
1405+ timeout_ms = self ._validate_timeout (timeout_ms )
1406+ if 0 < version <= 1 :
1407+ request = ElectLeadersRequest [version ](
1408+ election_type = ElectionType (election_type ),
1409+ topic_partitions = self ._get_topic_partitions (topic_partitions ),
1410+ timeout = timeout_ms ,
1411+ )
1412+ else :
1413+ raise NotImplementedError (
1414+ "Support for CreateTopics v{} has not yet been added to KafkaAdminClient."
1415+ .format (version ))
1416+ # TODO convert structs to a more pythonic interface
1417+ return self ._send_request_to_controller (request )
1418+
13401419 def _wait_for_futures (self , futures ):
13411420 while not all (future .succeeded () for future in futures ):
13421421 for future in futures :
13431422 self ._client .poll (future = future )
13441423
13451424 if future .failed ():
1346- raise future .exception # pylint: disable-msg=raising-bad-type
1425+ raise future .exception # pylint: disable-msg=raising-bad-type
0 commit comments