From 03c357f01c096fdbe20ed2f96a400d7b7fc4f2cb Mon Sep 17 00:00:00 2001 From: Matt Oberle Date: Wed, 18 Aug 2021 13:38:37 -0400 Subject: [PATCH 01/19] Support custom SASL mechanisms There is some interest in supporting various SASL mechanisms not currently included in the library: * #2110 (DMS) * #2204 (SSPI) * #2232 (AWS_MSK_IAM) Adding these mechanisms in the core library may be undesirable due to: * Increased maintenance burden. * Unavailable testing environments. * Vendor specificity. This commit provides a quick prototype for a pluggable SASL system. --- **Example** To define a custom SASL mechanism a module must implement two methods: ```py def validate_config(conn): # Check configuration values, available libraries, etc. assert conn.config['vendor_specific_setting'] is not None, ( 'vendor_specific_setting required when sasl_mechanism=MY_SASL' ) def try_authenticate(conn, future): # Do authentication routine and return resolved Future with failed # or succeeded state. ``` And then the custom mechanism should be registered before initializing a KafkaAdminClient, KafkaConsumer, or KafkaProducer: ```py import kafka.sasl from kafka import KafkaProducer import my_sasl kafka.sasl.register_mechanism('MY_SASL', my_sasl) producer = KafkaProducer(sasl_mechanism='MY_SASL') ``` --- **Notes** **ABCs** This prototype does not implement an ABC for custom SASL mechanisms. Using an ABC would reduce a few of the explicit assertions involved with registering a mechanism and is a viable option. Due to differing feature sets between py2/py3 this option was not explored, but shouldn't be difficult. **Private Methods** This prototype relies on some methods that are currently marked as **private** in `BrokerConnection`. * `._can_send_recv` * `._lock` * `._recv_bytes_blocking` * `._send_bytes_blocking` A pluggable system would require stable interfaces for these actions. **Alternative Approach** If the module-scoped dict modification in `register_mechanism` feels too clunky maybe the addtional mechanisms can be specified via an argument when initializing one of the `Kafka*` classes? --- kafka/conn.py | 274 +++----------------------------------- kafka/sasl/__init__.py | 53 ++++++++ kafka/sasl/gssapi.py | 100 ++++++++++++++ kafka/sasl/oauthbearer.py | 80 +++++++++++ kafka/sasl/plain.py | 58 ++++++++ kafka/sasl/scram.py | 68 ++++++++++ 6 files changed, 378 insertions(+), 255 deletions(-) create mode 100644 kafka/sasl/__init__.py create mode 100644 kafka/sasl/gssapi.py create mode 100644 kafka/sasl/oauthbearer.py create mode 100644 kafka/sasl/plain.py create mode 100644 kafka/sasl/scram.py diff --git a/kafka/conn.py b/kafka/conn.py index cac354875..c867eacf6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,7 +2,6 @@ import copy import errno -import io import logging from random import shuffle, uniform @@ -14,25 +13,26 @@ from kafka.vendor import selectors34 as selectors import socket -import struct import threading import time from kafka.vendor import six +from kafka import sasl import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.oauth.abstract import AbstractTokenProvider -from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2, DescribeClientQuotasRequest +from kafka.protocol.admin import ( + DescribeAclsRequest_v2, + DescribeClientQuotasRequest, + SaslHandShakeRequest, +) from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.offset import OffsetRequest from kafka.protocol.produce import ProduceRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.fetch import FetchRequest from kafka.protocol.parser import KafkaProtocol -from kafka.protocol.types import Int32, Int8 -from kafka.scram import ScramClient from kafka.version import __version__ @@ -83,6 +83,12 @@ class SSLWantWriteError(Exception): gssapi = None GSSError = None +# needed for AWS_MSK_IAM authentication: +try: + from botocore.session import Session as BotoSession +except ImportError: + # no botocore available, will disable AWS_MSK_IAM mechanism + BotoSession = None AFI_NAMES = { socket.AF_UNSPEC: "unspecified", @@ -227,7 +233,6 @@ class BrokerConnection(object): 'sasl_oauth_token_provider': None } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") def __init__(self, host, port, afi, **configs): self.host = host @@ -260,22 +265,10 @@ def __init__(self, host, port, afi, **configs): assert ssl_available, "Python wasn't built with SSL support" if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): - assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, ( - 'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS)) - if self.config['sasl_mechanism'] in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): - assert self.config['sasl_plain_username'] is not None, ( - 'sasl_plain_username required for PLAIN or SCRAM sasl' - ) - assert self.config['sasl_plain_password'] is not None, ( - 'sasl_plain_password required for PLAIN or SCRAM sasl' - ) - if self.config['sasl_mechanism'] == 'GSSAPI': - assert gssapi is not None, 'GSSAPI lib not available' - assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' - if self.config['sasl_mechanism'] == 'OAUTHBEARER': - token_provider = self.config['sasl_oauth_token_provider'] - assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' - assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()' + assert self.config['sasl_mechanism'] in sasl.MECHANISMS, ( + 'sasl_mechanism must be one of {}'.format(', '.join(sasl.MECHANISMS.keys())) + ) + sasl.MECHANISMS[self.config['sasl_mechanism']].validate_config(self) # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to @@ -553,19 +546,9 @@ def _handle_sasl_handshake_response(self, future, response): Errors.UnsupportedSaslMechanismError( 'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s' % (self.config['sasl_mechanism'], response.enabled_mechanisms))) - elif self.config['sasl_mechanism'] == 'PLAIN': - return self._try_authenticate_plain(future) - elif self.config['sasl_mechanism'] == 'GSSAPI': - return self._try_authenticate_gssapi(future) - elif self.config['sasl_mechanism'] == 'OAUTHBEARER': - return self._try_authenticate_oauth(future) - elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"): - return self._try_authenticate_scram(future) - else: - return future.failure( - Errors.UnsupportedSaslMechanismError( - 'kafka-python does not support SASL mechanism %s' % - self.config['sasl_mechanism'])) + + try_authenticate = sasl.MECHANISMS[self.config['sasl_mechanism']].try_authenticate + return try_authenticate(self, future) def _send_bytes(self, data): """Send some data via non-blocking IO @@ -619,225 +602,6 @@ def _recv_bytes_blocking(self, n): finally: self._sock.settimeout(0.0) - def _try_authenticate_plain(self, future): - if self.config['security_protocol'] == 'SASL_PLAINTEXT': - log.warning('%s: Sending username and password in the clear', self) - - data = b'' - # Send PLAIN credentials per RFC-4616 - msg = bytes('\0'.join([self.config['sasl_plain_username'], - self.config['sasl_plain_username'], - self.config['sasl_plain_password']]).encode('utf-8')) - size = Int32.encode(len(msg)) - - err = None - close = False - with self._lock: - if not self._can_send_recv(): - err = Errors.NodeNotReadyError(str(self)) - close = False - else: - try: - self._send_bytes_blocking(size + msg) - - # The server will send a zero sized message (that is Int32(0)) on success. - # The connection is closed on failure - data = self._recv_bytes_blocking(4) - - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error receiving reply from server", self) - err = Errors.KafkaConnectionError("%s: %s" % (self, e)) - close = True - - if err is not None: - if close: - self.close(error=err) - return future.failure(err) - - if data != b'\x00\x00\x00\x00': - error = Errors.AuthenticationFailedError('Unrecognized response during authentication') - return future.failure(error) - - log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) - return future.success(True) - - def _try_authenticate_scram(self, future): - if self.config['security_protocol'] == 'SASL_PLAINTEXT': - log.warning('%s: Exchanging credentials in the clear', self) - - scram_client = ScramClient( - self.config['sasl_plain_username'], self.config['sasl_plain_password'], self.config['sasl_mechanism'] - ) - - err = None - close = False - with self._lock: - if not self._can_send_recv(): - err = Errors.NodeNotReadyError(str(self)) - close = False - else: - try: - client_first = scram_client.first_message().encode('utf-8') - size = Int32.encode(len(client_first)) - self._send_bytes_blocking(size + client_first) - - (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) - server_first = self._recv_bytes_blocking(data_len).decode('utf-8') - scram_client.process_server_first_message(server_first) - - client_final = scram_client.final_message().encode('utf-8') - size = Int32.encode(len(client_final)) - self._send_bytes_blocking(size + client_final) - - (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) - server_final = self._recv_bytes_blocking(data_len).decode('utf-8') - scram_client.process_server_final_message(server_final) - - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error receiving reply from server", self) - err = Errors.KafkaConnectionError("%s: %s" % (self, e)) - close = True - - if err is not None: - if close: - self.close(error=err) - return future.failure(err) - - log.info( - '%s: Authenticated as %s via %s', self, self.config['sasl_plain_username'], self.config['sasl_mechanism'] - ) - return future.success(True) - - def _try_authenticate_gssapi(self, future): - kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host - auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name - gssapi_name = gssapi.Name( - auth_id, - name_type=gssapi.NameType.hostbased_service - ).canonicalize(gssapi.MechType.kerberos) - log.debug('%s: GSSAPI name: %s', self, gssapi_name) - - err = None - close = False - with self._lock: - if not self._can_send_recv(): - err = Errors.NodeNotReadyError(str(self)) - close = False - else: - # Establish security context and negotiate protection level - # For reference RFC 2222, section 7.2.1 - try: - # Exchange tokens until authentication either succeeds or fails - client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') - received_token = None - while not client_ctx.complete: - # calculate an output token from kafka token (or None if first iteration) - output_token = client_ctx.step(received_token) - - # pass output token to kafka, or send empty response if the security - # context is complete (output token is None in that case) - if output_token is None: - self._send_bytes_blocking(Int32.encode(0)) - else: - msg = output_token - size = Int32.encode(len(msg)) - self._send_bytes_blocking(size + msg) - - # The server will send a token back. Processing of this token either - # establishes a security context, or it needs further token exchange. - # The gssapi will be able to identify the needed next step. - # The connection is closed on failure. - header = self._recv_bytes_blocking(4) - (token_size,) = struct.unpack('>i', header) - received_token = self._recv_bytes_blocking(token_size) - - # Process the security layer negotiation token, sent by the server - # once the security context is established. - - # unwraps message containing supported protection levels and msg size - msg = client_ctx.unwrap(received_token).message - # Kafka currently doesn't support integrity or confidentiality security layers, so we - # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed - # by the server - msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:] - # add authorization identity to the response, GSS-wrap and send it - msg = client_ctx.wrap(msg + auth_id.encode(), False).message - size = Int32.encode(len(msg)) - self._send_bytes_blocking(size + msg) - - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error receiving reply from server", self) - err = Errors.KafkaConnectionError("%s: %s" % (self, e)) - close = True - except Exception as e: - err = e - close = True - - if err is not None: - if close: - self.close(error=err) - return future.failure(err) - - log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) - return future.success(True) - - def _try_authenticate_oauth(self, future): - data = b'' - - msg = bytes(self._build_oauth_client_request().encode("utf-8")) - size = Int32.encode(len(msg)) - - err = None - close = False - with self._lock: - if not self._can_send_recv(): - err = Errors.NodeNotReadyError(str(self)) - close = False - else: - try: - # Send SASL OAuthBearer request with OAuth token - self._send_bytes_blocking(size + msg) - - # The server will send a zero sized message (that is Int32(0)) on success. - # The connection is closed on failure - data = self._recv_bytes_blocking(4) - - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error receiving reply from server", self) - err = Errors.KafkaConnectionError("%s: %s" % (self, e)) - close = True - - if err is not None: - if close: - self.close(error=err) - return future.failure(err) - - if data != b'\x00\x00\x00\x00': - error = Errors.AuthenticationFailedError('Unrecognized response during authentication') - return future.failure(error) - - log.info('%s: Authenticated via OAuth', self) - return future.success(True) - - def _build_oauth_client_request(self): - token_provider = self.config['sasl_oauth_token_provider'] - return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions()) - - def _token_extensions(self): - """ - Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER - initial request. - """ - token_provider = self.config['sasl_oauth_token_provider'] - - # Only run if the #extensions() method is implemented by the clients Token Provider class - # Builds up a string separated by \x01 via a dict of key value pairs - if callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0: - msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()]) - return "\x01" + msg - else: - return "" - def blacked_out(self): """ Return true if we are disconnected from the given node and can't diff --git a/kafka/sasl/__init__.py b/kafka/sasl/__init__.py new file mode 100644 index 000000000..52621830f --- /dev/null +++ b/kafka/sasl/__init__.py @@ -0,0 +1,53 @@ +import logging + +from kafka.sasl import gssapi, oauthbearer, plain, scram + +log = logging.getLogger(__name__) + +MECHANISMS = { + 'GSSAPI': gssapi, + 'OAUTHBEARER': oauthbearer, + 'PLAIN': plain, + 'SCRAM-SHA-256': scram, + 'SCRAM-SHA-512': scram, +} + + +def register_mechanism(key, module): + """ + Registers a custom SASL mechanism that can be used via sasl_mechanism={key}. + + Example: + import kakfa.sasl + from kafka import KafkaProducer + from mymodule import custom_sasl + kafka.sasl.register_mechanism('CUSTOM_SASL', custom_sasl) + + producer = KafkaProducer(sasl_mechanism='CUSTOM_SASL') + + Arguments: + key (str): The name of the mechanism returned by the broker and used + in the sasl_mechanism config value. + module (module): A module that implements the following methods... + + def validate_config(conn: BrokerConnection): -> None: + # Raises an AssertionError for missing or invalid conifg values. + + def try_authenticate(conn: BrokerConncetion, future: -> Future): + # Executes authentication routine and returns a resolved Future. + + Raises: + AssertionError: The registered module does not define a required method. + """ + assert callable(getattr(module, 'validate_config', None)), ( + 'Custom SASL mechanism {} must implement method #validate_config()' + .format(key) + ) + assert callable(getattr(module, 'try_authenticate', None)), ( + 'Custom SASL mechanism {} must implement method #try_authenticate()' + .format(key) + ) + if key in MECHANISMS: + log.warning('Overriding existing SASL mechanism {}'.format(key)) + + MECHANISMS[key] = module diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py new file mode 100644 index 000000000..92a3ed954 --- /dev/null +++ b/kafka/sasl/gssapi.py @@ -0,0 +1,100 @@ +import io +import logging +import struct + +import kafka.errors as Errors +from kafka.protocol.types import Int8, Int32 + +try: + import gssapi + from gssapi.raw.misc import GSSError +except ImportError: + gssapi = None + GSSError = None + +log = logging.getLogger(__name__) + +SASL_QOP_AUTH = 1 + + +def validate_config(conn): + assert gssapi is not None, ( + 'gssapi library required when sasl_mechanism=GSSAPI' + ) + assert conn.config['sasl_kerberos_service_name'] is not None, ( + 'sasl_kerberos_service_name required when sasl_mechanism=GSSAPI' + ) + + +def try_authenticate(conn, future): + kerberos_damin_name = conn.config['sasl_kerberos_domain_name'] or conn.host + auth_id = conn.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name + gssapi_name = gssapi.Name( + auth_id, + name_type=gssapi.NameType.hostbased_service + ).canonicalize(gssapi.MechType.kerberos) + log.debug('%s: GSSAPI name: %s', conn, gssapi_name) + + err = None + close = False + with conn._lock: + if not conn._can_send_recv(): + err = Errors.NodeNotReadyError(str(conn)) + close = False + else: + # Establish security context and negotiate protection level + # For reference RFC 2222, section 7.2.1 + try: + # Exchange tokens until authentication either succeeds or fails + client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') + received_token = None + while not client_ctx.complete: + # calculate an output token from kafka token (or None if first iteration) + output_token = client_ctx.step(received_token) + + # pass output token to kafka, or send empty response if the security + # context is complete (output token is None in that case) + if output_token is None: + conn._send_bytes_blocking(Int32.encode(0)) + else: + msg = output_token + size = Int32.encode(len(msg)) + conn._send_bytes_blocking(size + msg) + + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + # The connection is closed on failure. + header = conn._recv_bytes_blocking(4) + (token_size,) = struct.unpack('>i', header) + received_token = conn._recv_bytes_blocking(token_size) + + # Process the security layer negotiation token, sent by the server + # once the security context is established. + + # unwraps message containing supported protection levels and msg size + msg = client_ctx.unwrap(received_token).message + # Kafka currently doesn't support integrity or confidentiality + # security layers, so we simply set QoP to 'auth' only (first octet). + # We reuse the max message size proposed by the server + msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:] + # add authorization identity to the response, GSS-wrap and send it + msg = client_ctx.wrap(msg + auth_id.encode(), False).message + size = Int32.encode(len(msg)) + conn._send_bytes_blocking(size + msg) + + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error receiving reply from server", conn) + err = Errors.KafkaConnectionError("%s: %s" % (conn, e)) + close = True + except Exception as e: + err = e + close = True + + if err is not None: + if close: + conn.close(error=err) + return future.failure(err) + + log.info('%s: Authenticated as %s via GSSAPI', conn, gssapi_name) + return future.success(True) diff --git a/kafka/sasl/oauthbearer.py b/kafka/sasl/oauthbearer.py new file mode 100644 index 000000000..f1427af9a --- /dev/null +++ b/kafka/sasl/oauthbearer.py @@ -0,0 +1,80 @@ +import logging + +import kafka.errors as Errors +from kafka.protocol.types import Int32 + +log = logging.getLogger(__name__) + + +def validate_config(conn): + token_provider = conn.config.get('sasl_oauth_token_provider') + assert token_provider is not None, ( + 'sasl_oauth_token_provider required when sasl_mechanism=OAUTHBEARER' + ) + assert callable(getattr(token_provider, 'token', None)), ( + 'sasl_oauth_token_provider must implement method #token()' + ) + + +def try_authenticate(conn, future): + data = b'' + + msg = bytes(_build_oauth_client_request(conn).encode("utf-8")) + size = Int32.encode(len(msg)) + + err = None + close = False + with conn._lock: + if not conn._can_send_recv(): + err = Errors.NodeNotReadyError(str(conn)) + close = False + else: + try: + # Send SASL OAuthBearer request with OAuth token + conn._send_bytes_blocking(size + msg) + + # The server will send a zero sized message (that is Int32(0)) on success. + # The connection is closed on failure + data = conn._recv_bytes_blocking(4) + + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error receiving reply from server", conn) + err = Errors.KafkaConnectionError("%s: %s" % (conn, e)) + close = True + + if err is not None: + if close: + conn.close(error=err) + return future.failure(err) + + if data != b'\x00\x00\x00\x00': + error = Errors.AuthenticationFailedError('Unrecognized response during authentication') + return future.failure(error) + + log.info('%s: Authenticated via OAuth', conn) + return future.success(True) + + +def _build_oauth_client_request(conn): + token_provider = conn.config['sasl_oauth_token_provider'] + return "n,,\x01auth=Bearer {}{}\x01\x01".format( + token_provider.token(), + _token_extensions(conn), + ) + + +def _token_extensions(conn): + """ + Return a string representation of the OPTIONAL key-value pairs that can be + sent with an OAUTHBEARER initial request. + """ + token_provider = conn.config['sasl_oauth_token_provider'] + + # Only run if the #extensions() method is implemented by the clients Token Provider class + # Builds up a string separated by \x01 via a dict of key value pairs + if (callable(getattr(token_provider, "extensions", None)) + and len(token_provider.extensions()) > 0): + msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()]) + return "\x01" + msg + else: + return "" diff --git a/kafka/sasl/plain.py b/kafka/sasl/plain.py new file mode 100644 index 000000000..5aedcbbb9 --- /dev/null +++ b/kafka/sasl/plain.py @@ -0,0 +1,58 @@ +import logging + +import kafka.errors as Errors +from kafka.protocol.types import Int32 + +log = logging.getLogger(__name__) + + +def validate_config(conn): + assert conn.config['sasl_plain_username'] is not None, ( + 'sasl_plain_username required when sasl_mechanism=PLAIN' + ) + assert conn.config['sasl_plain_password'] is not None, ( + 'sasl_plain_password required when sasl_mechanism=PLAIN' + ) + + +def try_authenticate(conn, future): + if conn.config['security_protocol'] == 'SASL_PLAINTEXT': + log.warning('%s: Sending username and password in the clear', conn) + + data = b'' + # Send PLAIN credentials per RFC-4616 + msg = bytes('\0'.join([conn.config['sasl_plain_username'], + conn.config['sasl_plain_username'], + conn.config['sasl_plain_password']]).encode('utf-8')) + size = Int32.encode(len(msg)) + + err = None + close = False + with conn._lock: + if not conn._can_send_recv(): + err = Errors.NodeNotReadyError(str(conn)) + close = False + else: + try: + conn._send_bytes_blocking(size + msg) + + # The server will send a zero sized message (that is Int32(0)) on success. + # The connection is closed on failure + data = conn._recv_bytes_blocking(4) + + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error receiving reply from server", conn) + err = Errors.KafkaConnectionError("%s: %s" % (conn, e)) + close = True + + if err is not None: + if close: + conn.close(error=err) + return future.failure(err) + + if data != b'\x00\x00\x00\x00': + error = Errors.AuthenticationFailedError('Unrecognized response during authentication') + return future.failure(error) + + log.info('%s: Authenticated as %s via PLAIN', conn, conn.config['sasl_plain_username']) + return future.success(True) diff --git a/kafka/sasl/scram.py b/kafka/sasl/scram.py new file mode 100644 index 000000000..f31c80c1b --- /dev/null +++ b/kafka/sasl/scram.py @@ -0,0 +1,68 @@ +import logging +import struct + +import kafka.errors as Errors +from kafka.protocol.types import Int32 +from kafka.scram import ScramClient + +log = logging.getLogger() + + +def validate_config(conn): + assert conn.config['sasl_plain_username'] is not None, ( + 'sasl_plain_username required when sasl_mechanism=SCRAM-*' + ) + assert conn.config['sasl_plain_password'] is not None, ( + 'sasl_plain_password required when sasl_mechanism=SCRAM-*' + ) + + +def try_authenticate(conn, future): + if conn.config['security_protocol'] == 'SASL_PLAINTEXT': + log.warning('%s: Exchanging credentials in the clear', conn) + + scram_client = ScramClient( + conn.config['sasl_plain_username'], + conn.config['sasl_plain_password'], + conn.config['sasl_mechanism'], + ) + + err = None + close = False + with conn._lock: + if not conn._can_send_recv(): + err = Errors.NodeNotReadyError(str(conn)) + close = False + else: + try: + client_first = scram_client.first_message().encode('utf-8') + size = Int32.encode(len(client_first)) + conn._send_bytes_blocking(size + client_first) + + (data_len,) = struct.unpack('>i', conn._recv_bytes_blocking(4)) + server_first = conn._recv_bytes_blocking(data_len).decode('utf-8') + scram_client.process_server_first_message(server_first) + + client_final = scram_client.final_message().encode('utf-8') + size = Int32.encode(len(client_final)) + conn._send_bytes_blocking(size + client_final) + + (data_len,) = struct.unpack('>i', conn._recv_bytes_blocking(4)) + server_final = conn._recv_bytes_blocking(data_len).decode('utf-8') + scram_client.process_server_final_message(server_final) + + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error receiving reply from server", conn) + err = Errors.KafkaConnectionError("%s: %s" % (conn, e)) + close = True + + if err is not None: + if close: + conn.close(error=err) + return future.failure(err) + + log.info( + '%s: Authenticated as %s via %s', + conn, conn.config['sasl_plain_username'], conn.config['sasl_mechanism'] + ) + return future.success(True) From b95e46da7f22f2a226383691d5a5224e04463037 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Thu, 7 Mar 2024 10:31:28 -0500 Subject: [PATCH 02/19] Rename project from kafka-python to kafka-python-ng (#1) --- CHANGES.md | 2 +- Makefile | 2 +- README.rst | 40 ++++++++++++++++++++-------------------- docs/Makefile | 8 ++++---- docs/apidoc/modules.rst | 2 +- docs/changelog.rst | 2 +- docs/compatibility.rst | 18 +++++++++--------- docs/conf.py | 10 +++++----- docs/index.rst | 16 ++++++++-------- docs/install.rst | 20 ++++++++++---------- docs/license.rst | 6 +++--- docs/support.rst | 2 +- docs/tests.rst | 10 +++++----- kafka/version.py | 10 +++++++++- setup.py | 11 ++++++----- 15 files changed, 84 insertions(+), 75 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 097c55db6..ccec6b5c3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -413,7 +413,7 @@ Some of the major changes include: * SASL authentication is working (we think) * Removed several circular references to improve gc on close() -Thanks to all contributors -- the state of the kafka-python community is strong! +Thanks to all contributors -- the state of the kafka-python-ng community is strong! Detailed changelog are listed below: diff --git a/Makefile b/Makefile index fc8fa5b21..9d7d89f4d 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ cov-local: build-integration @echo "open file://`pwd`/htmlcov/index.html" # Check the readme for syntax errors, which can lead to invalid formatting on -# PyPi homepage (https://pypi.python.org/pypi/kafka-python) +# PyPi homepage (https://pypi.python.org/pypi/kafka-python-ng) check-readme: python setup.py check -rms diff --git a/README.rst b/README.rst index 78a92a884..f185bf690 100644 --- a/README.rst +++ b/README.rst @@ -2,27 +2,27 @@ Kafka Python client ------------------------ .. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg - :target: https://kafka-python.readthedocs.io/en/master/compatibility.html -.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg - :target: https://pypi.python.org/pypi/kafka-python -.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github - :target: https://coveralls.io/github/dpkp/kafka-python?branch=master + :target: https://kafka-python-ng.readthedocs.io/en/master/compatibility.html +.. image:: https://img.shields.io/pypi/pyversions/kafka-python-ng.svg + :target: https://pypi.python.org/pypi/kafka-python-ng +.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng-ng/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/wbarnha/kafka-python-ng-ng?branch=master .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg - :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE -.. image:: https://img.shields.io/pypi/dw/kafka-python.svg + :target: https://github.com/wbarnha/kafka-python-ng-ng/blob/master/LICENSE +.. image:: https://img.shields.io/pypi/dw/kafka-python-ng.svg :target: https://pypistats.org/packages/kafka-python .. image:: https://img.shields.io/pypi/v/kafka-python.svg :target: https://pypi.org/project/kafka-python .. image:: https://img.shields.io/pypi/implementation/kafka-python - :target: https://github.com/dpkp/kafka-python/blob/master/setup.py + :target: https://github.com/wbarnha/kafka-python-ng/blob/master/setup.py Python client for the Apache Kafka distributed stream processing system. -kafka-python is designed to function much like the official java client, with a +kafka-python-ng is designed to function much like the official java client, with a sprinkling of pythonic interfaces (e.g., consumer iterators). -kafka-python is best used with newer brokers (0.9+), but is backwards-compatible with +kafka-python-ng is best used with newer brokers (0.9+), but is backwards-compatible with older versions (to 0.8.0). Some features will only be enabled on newer brokers. For example, fully coordinated consumer groups -- i.e., dynamic partition assignment to multiple consumers in the same group -- requires use of 0.9+ kafka @@ -32,13 +32,13 @@ check code (perhaps using zookeeper or consul). For older brokers, you can achieve something similar by manually assigning different partitions to each consumer instance with config management tools like chef, ansible, etc. This approach will work fine, though it does not support rebalancing on failures. -See +See for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python +>>> pip install kafka-python-ng KafkaConsumer @@ -48,7 +48,7 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. Full support for coordinated consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+. -See +See for API and configuration details. The consumer iterator returns ConsumerRecords, which are simple namedtuples @@ -91,7 +91,7 @@ KafkaProducer KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. -See +See for more details. >>> from kafka import KafkaProducer @@ -146,7 +146,7 @@ multiprocessing is recommended. Compression *********** -kafka-python supports the following compression formats: +kafka-python-ng supports the following compression formats: - gzip - LZ4 @@ -154,23 +154,23 @@ kafka-python supports the following compression formats: - Zstandard (zstd) gzip is supported natively, the others require installing additional libraries. -See for more information. +See for more information. Optimized CRC32 Validation ************************** -Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure +Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure python implementation for compatibility. To improve performance for high-throughput -applications, kafka-python will use `crc32c` for optimized native code if installed. -See for installation instructions. +applications, kafka-python-ng will use `crc32c` for optimized native code if installed. +See for installation instructions. See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib. Protocol ******** -A secondary goal of kafka-python is to provide an easy-to-use protocol layer +A secondary goal of kafka-python-ng is to provide an easy-to-use protocol layer for interacting with kafka brokers via the python repl. This is useful for testing, probing, and general experimentation. The protocol support is leveraged to enable a KafkaClient.check_version() method that diff --git a/docs/Makefile b/docs/Makefile index b27cf7742..31e74e1aa 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -85,17 +85,17 @@ qthelp: @echo @echo "Build finished; now you can run "qcollectiongenerator" with the" \ ".qhcp project file in $(BUILDDIR)/qthelp, like this:" - @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/kafka-python.qhcp" + @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/kafka-python-ng.qhcp" @echo "To view the help file:" - @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/kafka-python.qhc" + @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/kafka-python-ng.qhc" devhelp: $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp @echo @echo "Build finished." @echo "To view the help file:" - @echo "# mkdir -p $$HOME/.local/share/devhelp/kafka-python" - @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/kafka-python" + @echo "# mkdir -p $$HOME/.local/share/devhelp/kafka-python-ng" + @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/kafka-python-ng" @echo "# devhelp" epub: diff --git a/docs/apidoc/modules.rst b/docs/apidoc/modules.rst index 066fc6523..29be3486f 100644 --- a/docs/apidoc/modules.rst +++ b/docs/apidoc/modules.rst @@ -1,4 +1,4 @@ -kafka-python API +kafka-python-ng API **************** .. toctree:: diff --git a/docs/changelog.rst b/docs/changelog.rst index 446b29021..9d3cb6512 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -484,7 +484,7 @@ Some of the major changes include: * SASL authentication is working (we think) * Removed several circular references to improve gc on close() -Thanks to all contributors -- the state of the kafka-python community is strong! +Thanks to all contributors -- the state of the kafka-python-ng community is strong! Detailed changelog are listed below: diff --git a/docs/compatibility.rst b/docs/compatibility.rst index b3ad00634..e8e1342c3 100644 --- a/docs/compatibility.rst +++ b/docs/compatibility.rst @@ -2,20 +2,20 @@ Compatibility ------------- .. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg - :target: https://kafka-python.readthedocs.io/compatibility.html -.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg - :target: https://pypi.python.org/pypi/kafka-python + :target: https://kafka-python-ng.readthedocs.io/compatibility.html +.. image:: https://img.shields.io/pypi/pyversions/kafka-python-ng.svg + :target: https://pypi.python.org/pypi/kafka-python-ng -kafka-python is compatible with (and tested against) broker versions 2.6 -through 0.8.0 . kafka-python is not compatible with the 0.8.2-beta release. +kafka-python-ng is compatible with (and tested against) broker versions 2.6 +through 0.8.0 . kafka-python-ng is not compatible with the 0.8.2-beta release. -Because the kafka server protocol is backwards compatible, kafka-python is +Because the kafka server protocol is backwards compatible, kafka-python-ng is expected to work with newer broker releases as well. -Although kafka-python is tested and expected to work on recent broker versions, +Although kafka-python-ng is tested and expected to work on recent broker versions, not all features are supported. Specifically, authentication codecs, and transactional producer/consumer support are not fully implemented. PRs welcome! -kafka-python is tested on python 2.7, 3.4, 3.7, 3.8 and pypy2.7. +kafka-python-ng is tested on python 2.7, 3.4, 3.7, 3.8 and pypy2.7. -Builds and tests via Travis-CI. See https://travis-ci.org/dpkp/kafka-python +Builds and tests via Travis-CI. See https://travis-ci.org/wbarnha/kafka-python-ng diff --git a/docs/conf.py b/docs/conf.py index efa8d0807..e5b013b0d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# kafka-python documentation build configuration file, created by +# kafka-python-ng documentation build configuration file, created by # sphinx-quickstart on Sun Jan 4 12:21:50 2015. # # This file is execfile()d with the current directory set to its @@ -47,7 +47,7 @@ master_doc = 'index' # General information about the project. -project = u'kafka-python' +project = u'kafka-python-ng' copyright = u'2016 -- Dana Powers, David Arthur, and Contributors' # The version info for the project you're documenting, acts as replacement for @@ -201,7 +201,7 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - ('index', 'kafka-python.tex', u'kafka-python Documentation', + ('index', 'kafka-python-ng.tex', u'kafka-python-ng Documentation', u'Dana Powers', 'manual'), ] @@ -231,7 +231,7 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [ - ('index', 'kafka-python', u'kafka-python Documentation', + ('index', 'kafka-python-ng', u'kafka-python-ng Documentation', [u'Dana Powers'], 1) ] @@ -245,7 +245,7 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - ('index', 'kafka-python', u'kafka-python Documentation', + ('index', 'kafka-python-ng', u'kafka-python-ng Documentation', u'Dana Powers', 'kafka-python', 'One line description of project.', 'Miscellaneous'), ] diff --git a/docs/index.rst b/docs/index.rst index 91e5086cc..92b998d92 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,16 +1,16 @@ -kafka-python +kafka-python-ng ############ .. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg :target: https://kafka-python.readthedocs.io/en/master/compatibility.html .. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg - :target: https://pypi.python.org/pypi/kafka-python -.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github - :target: https://coveralls.io/github/dpkp/kafka-python?branch=master -.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master - :target: https://travis-ci.org/dpkp/kafka-python + :target: https://pypi.python.org/pypi/kafka-python-ng +.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/wbarnha/kafka-python-ng?branch=master +.. image:: https://travis-ci.org/wbarnha/kafka-python-ng.svg?branch=master + :target: https://travis-ci.org/wbarnha/kafka-python-ng .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg - :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE + :target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a @@ -31,7 +31,7 @@ failures. See `Compatibility `_ for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python +>>> pip install kafka-python-ng KafkaConsumer diff --git a/docs/install.rst b/docs/install.rst index 19901ee29..6ed917cd4 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -9,9 +9,9 @@ Pip: .. code:: bash - pip install kafka-python + pip install kafka-python-ng -Releases are also listed at https://github.com/dpkp/kafka-python/releases +Releases are also listed at https://github.com/wbarnha/kafka-python-ng/releases Bleeding-Edge @@ -19,21 +19,21 @@ Bleeding-Edge .. code:: bash - git clone https://github.com/dpkp/kafka-python - pip install ./kafka-python + git clone https://github.com/wbarnha/kafka-python-ng + pip install ./kafka-python-ng Optional crc32c install *********************** -Highly recommended if you are using Kafka 11+ brokers. For those `kafka-python` +Highly recommended if you are using Kafka 11+ brokers. For those `kafka-python-ng` uses a new message protocol version, that requires calculation of `crc32c`, -which differs from the `zlib.crc32` hash implementation. By default `kafka-python` +which differs from the `zlib.crc32` hash implementation. By default `kafka-python-ng` calculates it in pure python, which is quite slow. To speed it up we optionally support https://pypi.python.org/pypi/crc32c package if it's installed. .. code:: bash - pip install 'kafka-python[crc32c]' + pip install 'kafka-python-ng[crc32c]' Optional ZSTD install @@ -41,7 +41,7 @@ Optional ZSTD install To enable ZSTD compression/decompression, install python-zstandard: ->>> pip install 'kafka-python[zstd]' +>>> pip install 'kafka-python-ng[zstd]' Optional LZ4 install @@ -49,7 +49,7 @@ Optional LZ4 install To enable LZ4 compression/decompression, install python-lz4: ->>> pip install 'kafka-python[lz4]' +>>> pip install 'kafka-python-ng[lz4]' Optional Snappy install @@ -90,4 +90,4 @@ Install the `python-snappy` module .. code:: bash - pip install 'kafka-python[snappy]' + pip install 'kafka-python-ng[snappy]' diff --git a/docs/license.rst b/docs/license.rst index e9d5c9adb..016a916ba 100644 --- a/docs/license.rst +++ b/docs/license.rst @@ -2,9 +2,9 @@ License ------- .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg - :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE + :target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE -Apache License, v2.0. See `LICENSE `_. +Apache License, v2.0. See `LICENSE `_. Copyright 2016, Dana Powers, David Arthur, and Contributors -(See `AUTHORS `_). +(See `AUTHORS `_). diff --git a/docs/support.rst b/docs/support.rst index 63d4a86a2..25014b3fd 100644 --- a/docs/support.rst +++ b/docs/support.rst @@ -1,7 +1,7 @@ Support ------- -For support, see github issues at https://github.com/dpkp/kafka-python +For support, see github issues at https://github.com/wbarnha/kafka-python-ng Limited IRC chat at #kafka-python on freenode (general chat is #apache-kafka). diff --git a/docs/tests.rst b/docs/tests.rst index 561179ca5..763c2e54d 100644 --- a/docs/tests.rst +++ b/docs/tests.rst @@ -1,17 +1,17 @@ Tests ===== -.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github - :target: https://coveralls.io/github/dpkp/kafka-python?branch=master -.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master - :target: https://travis-ci.org/dpkp/kafka-python +.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/wbarnha/kafka-python-ng?branch=master +.. image:: https://travis-ci.org/wbarnha/kafka-python-ng.svg?branch=master + :target: https://travis-ci.org/wbarnha/kafka-python-ng Test environments are managed via tox. The test suite is run via pytest. Linting is run via pylint, but is generally skipped on pypy due to pylint compatibility / performance issues. -For test coverage details, see https://coveralls.io/github/dpkp/kafka-python +For test coverage details, see https://coveralls.io/github/wbarnha/kafka-python-ng The test suite includes unit tests that mock network interfaces, as well as integration tests that setup and teardown kafka broker (and zookeeper) diff --git a/kafka/version.py b/kafka/version.py index 06306bd1f..8a26a1868 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1,9 @@ -__version__ = '2.0.3-dev' +import sys + +if sys.version_info < (3, 8): + from importlib_metadata import version +else: + from importlib.metadata import version + + +__version__ = version("kafka-python-ng") diff --git a/setup.py b/setup.py index 77043da04..75d2b8bcb 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,6 @@ # Pull version from source without importing # since we can't import something we haven't built yet :) -exec(open('kafka/version.py').read()) class Tox(Command): @@ -32,9 +31,9 @@ def run(cls): README = f.read() setup( - name="kafka-python", - version=__version__, - + name="kafka-python-ng", + use_scm_version=True, + setup_requires=["setuptools_scm"], tests_require=test_require, extras_require={ "crc32c": ["crc32c"], @@ -46,7 +45,9 @@ def run(cls): packages=find_packages(exclude=['test']), author="Dana Powers", author_email="dana.powers@gmail.com", - url="https://github.com/dpkp/kafka-python", + maintainer="William Barnhart", + maintainer_email="williambbarnhart@gmail.com", + url="https://github.com/wbarnha/kafka-python-ng", license="Apache License 2.0", description="Pure Python client for Apache Kafka", long_description=README, From 78c74c095172ec30df875bbf0861aeee4ed441c8 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Thu, 7 Mar 2024 11:14:36 -0500 Subject: [PATCH 03/19] Fix artifact downloads for release --- .github/workflows/python-package.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 59ad718cf..cee99ef5c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -41,7 +41,7 @@ jobs: - name: Build artifacts run: python -m build - name: Upload built artifacts for testing - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: ${{ env.sdist-artifact }} # NOTE: Exact expected file names are specified here @@ -187,11 +187,11 @@ jobs: environment: pypi if: github.event_name == 'release' && github.event.action == 'created' steps: - - name: Download the sdist artifact - uses: actions/download-artifact@v3 + - name: Download the artifacts + uses: actions/download-artifact@v4 with: - name: artifact - path: dist + name: ${{ env.sdist-artifact }} + path: dist/${{ env.sdist-name }} - name: Publish package to PyPI uses: pypa/gh-action-pypi-publish@release/v1 with: From e7960197430d80bbd9aa8ab1d4c4d8c67f0cf624 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 14:02:49 -0500 Subject: [PATCH 04/19] Fix badge links in README.rst --- README.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index f185bf690..8a5c71b38 100644 --- a/README.rst +++ b/README.rst @@ -5,15 +5,15 @@ Kafka Python client :target: https://kafka-python-ng.readthedocs.io/en/master/compatibility.html .. image:: https://img.shields.io/pypi/pyversions/kafka-python-ng.svg :target: https://pypi.python.org/pypi/kafka-python-ng -.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng-ng/badge.svg?branch=master&service=github - :target: https://coveralls.io/github/wbarnha/kafka-python-ng-ng?branch=master +.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/wbarnha/kafka-python-ng?branch=master .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg - :target: https://github.com/wbarnha/kafka-python-ng-ng/blob/master/LICENSE + :target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE .. image:: https://img.shields.io/pypi/dw/kafka-python-ng.svg - :target: https://pypistats.org/packages/kafka-python + :target: https://pypistats.org/packages/kafka-python-ng .. image:: https://img.shields.io/pypi/v/kafka-python.svg - :target: https://pypi.org/project/kafka-python -.. image:: https://img.shields.io/pypi/implementation/kafka-python + :target: https://pypi.org/project/kafka-python-ng +.. image:: https://img.shields.io/pypi/implementation/kafka-python-ng :target: https://github.com/wbarnha/kafka-python-ng/blob/master/setup.py From 38e159a1863dba4c3dc36f0338f6e74f4abaa7c2 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 18:06:20 -0500 Subject: [PATCH 05/19] Reconfigure tests to complete in a more timely manner and skip some iterations for Kafka 0.8.2 and Python 3.12 (#159) * skip failing tests for PyPy since they work locally * Reconfigure tests for PyPy and 3.12 * Skip partitioner tests in test_partitioner.py if 3.12 and 0.8.2 * Update test_partitioner.py * Update test_producer.py * Timeout tests after ten minutes * Set 0.8.2.2 to be experimental from hereon * Formally support PyPy 3.9 --- .github/workflows/python-package.yml | 10 +++++++--- test/test_admin_integration.py | 5 +++++ test/test_consumer_group.py | 4 ++++ test/test_partitioner.py | 1 + test/test_producer.py | 6 +++++- 5 files changed, 22 insertions(+), 4 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index cee99ef5c..ef49b4e58 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -66,10 +66,9 @@ jobs: - "3.10" - "3.11" - "3.12" + - "pypy3.9" experimental: [ false ] include: - - python-version: "pypy3.9" - experimental: true - python-version: "~3.13.0-0" experimental: true steps: @@ -115,11 +114,11 @@ jobs: needs: - build-sdist runs-on: ubuntu-latest + timeout-minutes: 10 strategy: fail-fast: false matrix: kafka-version: - - "0.8.2.2" - "0.9.0.1" - "0.10.2.2" - "0.11.0.2" @@ -128,6 +127,11 @@ jobs: - "2.4.0" - "2.5.0" - "2.6.0" + experimental: [false] + include: + - kafka-version: '0.8.2.2' + experimental: true + continue-on-error: ${{ matrix.experimental }} steps: - name: Checkout the source code uses: actions/checkout@v4 diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 06c40a223..283023049 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,3 +1,5 @@ +import platform + import pytest from logging import info @@ -151,6 +153,9 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client): group_description = kafka_admin_client.describe_consumer_groups(['test']) +@pytest.mark.skipif( + platform.python_implementation() == "PyPy", reason="Works on PyPy if run locally, but not in CI/CD pipeline." +) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): """Tests that the describe consumer group call returns valid consumer group information diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 58dc7ebf9..4904ffeea 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -1,5 +1,6 @@ import collections import logging +import platform import threading import time @@ -40,6 +41,9 @@ def test_consumer_topics(kafka_broker, topic): consumer.close() +@pytest.mark.skipif( + platform.python_implementation() == "PyPy", reason="Works on PyPy if run locally, but not in CI/CD pipeline." +) @pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') def test_group(kafka_broker, topic): num_partitions = 4 diff --git a/test/test_partitioner.py b/test/test_partitioner.py index 853fbf69e..09fa0412a 100644 --- a/test/test_partitioner.py +++ b/test/test_partitioner.py @@ -2,6 +2,7 @@ import pytest + from kafka.partitioner import DefaultPartitioner, murmur2 diff --git a/test/test_producer.py b/test/test_producer.py index 7263130d1..15c244113 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,5 +1,6 @@ import gc import platform +import sys import time import threading @@ -10,6 +11,7 @@ from test.testutil import env_kafka_version, random_string +@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") def test_buffer_pool(): pool = SimpleBufferPool(1000, 1000) @@ -21,8 +23,8 @@ def test_buffer_pool(): buf2 = pool.allocate(1000, 1000) assert buf2.read() == b'' - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): if compression == 'lz4': @@ -70,6 +72,7 @@ def test_end_to_end(kafka_broker, compression): @pytest.mark.skipif(platform.python_implementation() != 'CPython', reason='Test relies on CPython-specific gc policies') +@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") def test_kafka_producer_gc_cleanup(): gc.collect() threads = threading.active_count() @@ -81,6 +84,7 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if compression == 'zstd' and env_kafka_version() < (2, 1, 0): From e76232106e4129d5ab2e279f546791446260b6b7 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 19:49:57 -0500 Subject: [PATCH 06/19] Test Kafka 0.8.2.2 using Python 3.10 in the meantime (#161) * Test Kafka 0.8.2.2 using Python 3.11 in the meantime * Override PYTHON_LATEST conditionally in python-package.yml * Update python-package.yml * add python annotation to kafka version test matrix * Update python-package.yml * try python 3.10 --- .github/workflows/python-package.yml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index ef49b4e58..d39eeccf8 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -110,7 +110,7 @@ jobs: KAFKA_VERSION: ${{ env.KAFKA_LATEST }} test-kafka: - name: Tests for Kafka ${{ matrix.kafka-version }} + name: Tests for Kafka ${{ matrix.kafka-version }} (Python ${{ matrix.python-version }}) needs: - build-sdist runs-on: ubuntu-latest @@ -127,10 +127,17 @@ jobs: - "2.4.0" - "2.5.0" - "2.6.0" + python-version: ['3.12'] experimental: [false] include: - kafka-version: '0.8.2.2' experimental: true + python-version: "3.12" + - kafka-version: '0.8.2.2' + experimental: false + python-version: "3.10" + env: + PYTHON_LATEST: ${{ matrix.python-version }} continue-on-error: ${{ matrix.experimental }} steps: - name: Checkout the source code @@ -145,7 +152,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: ${{ env.PYTHON_LATEST }} + python-version: ${{ matrix.python-version }} cache: pip cache-dependency-path: | requirements-dev.txt From 00750aaa2eb7fe568e15d995d0bca12700bf94b9 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 20:09:44 -0500 Subject: [PATCH 07/19] Remove support for EOL'ed versions of Python (#160) * Remove support for EOL'ed versions of Python * Update setup.py --- setup.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 75d2b8bcb..dd4e5de90 100644 --- a/setup.py +++ b/setup.py @@ -32,6 +32,7 @@ def run(cls): setup( name="kafka-python-ng", + python_requires=">=3.8", use_scm_version=True, setup_requires=["setuptools_scm"], tests_require=test_require, @@ -60,13 +61,6 @@ def run(cls): "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", - "Programming Language :: Python :: 2", - "Programming Language :: Python :: 2.7", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.4", - "Programming Language :: Python :: 3.5", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", From 5bd1323d3ed4d44081592d6518435fc89724318a Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 20:21:06 -0500 Subject: [PATCH 08/19] Stop testing Python 3.13 in python-package.yml (#162) Too many MRs to review... so little time. --- .github/workflows/python-package.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index d39eeccf8..df0e4e489 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -68,9 +68,6 @@ jobs: - "3.12" - "pypy3.9" experimental: [ false ] - include: - - python-version: "~3.13.0-0" - experimental: true steps: - name: Checkout the source code uses: actions/checkout@v4 From cda8f81ae16ee087cd039a4277be13f2004d4d09 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 20:44:41 -0500 Subject: [PATCH 09/19] Avoid 100% CPU usage while socket is closed (#156) After stop/start kafka service, kafka-python may use 100% CPU caused by busy-retry while the socket was closed. This fix the issue by unregister the socket if the fd is negative. Co-authored-by: Orange Kao --- kafka/client_async.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..530a1f441 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -637,6 +637,9 @@ def _poll(self, timeout): self._sensors.select_time.record((end_select - start_select) * 1000000000) for key, events in ready: + if key.fileobj.fileno() < 0: + self._selector.unregister(key.fileobj) + if key.fileobj is self._wake_r: self._clear_wake_fd() continue From c02df0899f3b6a1e71f05fd91ff506b1b0c4c3eb Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 22:45:43 -0500 Subject: [PATCH 10/19] Fix DescribeConfigsResponse_v1 config_source (#150) Co-authored-by: Ryar Nyah --- kafka/protocol/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index f9d61e5cd..41b4a9576 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -719,7 +719,7 @@ class DescribeConfigsResponse_v1(Response): ('config_names', String('utf-8')), ('config_value', String('utf-8')), ('read_only', Boolean), - ('is_default', Boolean), + ('config_source', Int8), ('is_sensitive', Boolean), ('config_synonyms', Array( ('config_name', String('utf-8')), From 65eacfb2e6900d789ad501c6d5c81fcbc420ed16 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:15:36 -0500 Subject: [PATCH 11/19] Fix base class of DescribeClientQuotasResponse_v0 (#144) Co-authored-by: Denis Otkidach --- kafka/protocol/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 41b4a9576..0bb1a7acc 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -925,7 +925,7 @@ class DeleteGroupsRequest_v1(Request): ] -class DescribeClientQuotasResponse_v0(Request): +class DescribeClientQuotasResponse_v0(Response): API_KEY = 48 API_VERSION = 0 SCHEMA = Schema( From e0ebe5dd3191778b35967b3a0dd22ca6e091a9b7 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:16:53 -0500 Subject: [PATCH 12/19] Update license_file to license_files (#131) The former has been deprecated since setuptools 56 Co-authored-by: micwoj92 <45581170+micwoj92@users.noreply.github.com> --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 5c6311daf..76daa0897 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,4 +2,4 @@ universal=1 [metadata] -license_file = LICENSE +license_files = LICENSE From 26bb3eb6534cc1aff0a2a7751de7f7dedfdd4cd5 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:22:03 -0500 Subject: [PATCH 13/19] Update some RST documentation syntax (#130) * docs: Update syntax in README.rst * docs: Update code block syntax in docs/index.rst --------- Co-authored-by: HalfSweet <60973476+HalfSweet@users.noreply.github.com> --- README.rst | 165 +++++++++++++++++++++++++++++++------------------ docs/index.rst | 114 +++++++++++++++++++++------------- 2 files changed, 174 insertions(+), 105 deletions(-) diff --git a/README.rst b/README.rst index 8a5c71b38..b7acfc8a2 100644 --- a/README.rst +++ b/README.rst @@ -32,13 +32,19 @@ check code (perhaps using zookeeper or consul). For older brokers, you can achieve something similar by manually assigning different partitions to each consumer instance with config management tools like chef, ansible, etc. This approach will work fine, though it does not support rebalancing on failures. -See + +See https://kafka-python.readthedocs.io/en/master/compatibility.html + for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python-ng + +.. code-block:: bash + + $ pip install kafka-python-ng + KafkaConsumer @@ -48,42 +54,56 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. Full support for coordinated consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+. -See + +See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html + for API and configuration details. The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic') ->>> for msg in consumer: -... print (msg) +.. code-block:: python ->>> # join a consumer group for dynamic partition assignment and offset commits ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') ->>> for msg in consumer: -... print (msg) + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic') + for msg in consumer: + print (msg) ->>> # manually assign the partition list for the consumer ->>> from kafka import TopicPartition ->>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') ->>> consumer.assign([TopicPartition('foobar', 2)]) ->>> msg = next(consumer) +.. code-block:: python ->>> # Deserialize msgpack-encoded values ->>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) ->>> consumer.subscribe(['msgpackfoo']) ->>> for msg in consumer: -... assert isinstance(msg.value, dict) + # join a consumer group for dynamic partition assignment and offset commits + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') + for msg in consumer: + print (msg) ->>> # Access record headers. The returned value is a list of tuples ->>> # with str, bytes for key and value ->>> for msg in consumer: -... print (msg.headers) +.. code-block:: python ->>> # Get consumer metrics ->>> metrics = consumer.metrics() + # manually assign the partition list for the consumer + from kafka import TopicPartition + consumer = KafkaConsumer(bootstrap_servers='localhost:1234') + consumer.assign([TopicPartition('foobar', 2)]) + msg = next(consumer) + +.. code-block:: python + + # Deserialize msgpack-encoded values + consumer = KafkaConsumer(value_deserializer=msgpack.loads) + consumer.subscribe(['msgpackfoo']) + for msg in consumer: + assert isinstance(msg.value, dict) + +.. code-block:: python + + # Access record headers. The returned value is a list of tuples + # with str, bytes for key and value + for msg in consumer: + print (msg.headers) + +.. code-block:: python + + # Get consumer metrics + metrics = consumer.metrics() KafkaProducer @@ -91,46 +111,66 @@ KafkaProducer KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. -See + +See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html + for more details. ->>> from kafka import KafkaProducer ->>> producer = KafkaProducer(bootstrap_servers='localhost:1234') ->>> for _ in range(100): -... producer.send('foobar', b'some_message_bytes') +.. code-block:: python + + from kafka import KafkaProducer + producer = KafkaProducer(bootstrap_servers='localhost:1234') + for _ in range(100): + producer.send('foobar', b'some_message_bytes') + +.. code-block:: python + + # Block until a single message is sent (or timeout) + future = producer.send('foobar', b'another_message') + result = future.get(timeout=60) + +.. code-block:: python + + # Block until all pending messages are at least put on the network + # NOTE: This does not guarantee delivery or success! It is really + # only useful if you configure internal batching using linger_ms + producer.flush() + +.. code-block:: python ->>> # Block until a single message is sent (or timeout) ->>> future = producer.send('foobar', b'another_message') ->>> result = future.get(timeout=60) + # Use a key for hashed-partitioning + producer.send('foobar', key=b'foo', value=b'bar') ->>> # Block until all pending messages are at least put on the network ->>> # NOTE: This does not guarantee delivery or success! It is really ->>> # only useful if you configure internal batching using linger_ms ->>> producer.flush() +.. code-block:: python ->>> # Use a key for hashed-partitioning ->>> producer.send('foobar', key=b'foo', value=b'bar') + # Serialize json messages + import json + producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send('fizzbuzz', {'foo': 'bar'}) ->>> # Serialize json messages ->>> import json ->>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) ->>> producer.send('fizzbuzz', {'foo': 'bar'}) +.. code-block:: python ->>> # Serialize string keys ->>> producer = KafkaProducer(key_serializer=str.encode) ->>> producer.send('flipflap', key='ping', value=b'1234') + # Serialize string keys + producer = KafkaProducer(key_serializer=str.encode) + producer.send('flipflap', key='ping', value=b'1234') ->>> # Compress messages ->>> producer = KafkaProducer(compression_type='gzip') ->>> for i in range(1000): -... producer.send('foobar', b'msg %d' % i) +.. code-block:: python ->>> # Include record headers. The format is list of tuples with string key ->>> # and bytes value. ->>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')]) + # Compress messages + producer = KafkaProducer(compression_type='gzip') + for i in range(1000): + producer.send('foobar', b'msg %d' % i) ->>> # Get producer performance metrics ->>> metrics = producer.metrics() +.. code-block:: python + + # Include record headers. The format is list of tuples with string key + # and bytes value. + producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')]) + +.. code-block:: python + + # Get producer performance metrics + metrics = producer.metrics() Thread safety @@ -154,7 +194,9 @@ kafka-python-ng supports the following compression formats: - Zstandard (zstd) gzip is supported natively, the others require installing additional libraries. -See for more information. + +See https://kafka-python.readthedocs.io/en/master/install.html for more information. + Optimized CRC32 Validation @@ -162,8 +204,9 @@ Optimized CRC32 Validation Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure python implementation for compatibility. To improve performance for high-throughput -applications, kafka-python-ng will use `crc32c` for optimized native code if installed. -See for installation instructions. +applications, kafka-python will use `crc32c` for optimized native code if installed. +See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions. + See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib. diff --git a/docs/index.rst b/docs/index.rst index 92b998d92..779ad997b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -31,7 +31,11 @@ failures. See `Compatibility `_ for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python-ng + +.. code:: bash + + pip install kafka-python-ng + KafkaConsumer @@ -47,28 +51,36 @@ See `KafkaConsumer `_ for API and configuration detai The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic') ->>> for msg in consumer: -... print (msg) +.. code:: python + + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic') + for msg in consumer: + print (msg) + +.. code:: python + + # join a consumer group for dynamic partition assignment and offset commits + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') + for msg in consumer: + print (msg) + +.. code:: python ->>> # join a consumer group for dynamic partition assignment and offset commits ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') ->>> for msg in consumer: -... print (msg) + # manually assign the partition list for the consumer + from kafka import TopicPartition + consumer = KafkaConsumer(bootstrap_servers='localhost:1234') + consumer.assign([TopicPartition('foobar', 2)]) + msg = next(consumer) ->>> # manually assign the partition list for the consumer ->>> from kafka import TopicPartition ->>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') ->>> consumer.assign([TopicPartition('foobar', 2)]) ->>> msg = next(consumer) +.. code:: python ->>> # Deserialize msgpack-encoded values ->>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) ->>> consumer.subscribe(['msgpackfoo']) ->>> for msg in consumer: -... assert isinstance(msg.value, dict) + # Deserialize msgpack-encoded values + consumer = KafkaConsumer(value_deserializer=msgpack.loads) + consumer.subscribe(['msgpackfoo']) + for msg in consumer: + assert isinstance(msg.value, dict) KafkaProducer @@ -78,36 +90,50 @@ KafkaProducer The class is intended to operate as similarly as possible to the official java client. See `KafkaProducer `_ for more details. ->>> from kafka import KafkaProducer ->>> producer = KafkaProducer(bootstrap_servers='localhost:1234') ->>> for _ in range(100): -... producer.send('foobar', b'some_message_bytes') +.. code:: python + + from kafka import KafkaProducer + producer = KafkaProducer(bootstrap_servers='localhost:1234') + for _ in range(100): + producer.send('foobar', b'some_message_bytes') + +.. code:: python + + # Block until a single message is sent (or timeout) + future = producer.send('foobar', b'another_message') + result = future.get(timeout=60) + +.. code:: python + + # Block until all pending messages are at least put on the network + # NOTE: This does not guarantee delivery or success! It is really + # only useful if you configure internal batching using linger_ms + producer.flush() + +.. code:: python + + # Use a key for hashed-partitioning + producer.send('foobar', key=b'foo', value=b'bar') ->>> # Block until a single message is sent (or timeout) ->>> future = producer.send('foobar', b'another_message') ->>> result = future.get(timeout=60) +.. code:: python ->>> # Block until all pending messages are at least put on the network ->>> # NOTE: This does not guarantee delivery or success! It is really ->>> # only useful if you configure internal batching using linger_ms ->>> producer.flush() + # Serialize json messages + import json + producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send('fizzbuzz', {'foo': 'bar'}) ->>> # Use a key for hashed-partitioning ->>> producer.send('foobar', key=b'foo', value=b'bar') +.. code:: python ->>> # Serialize json messages ->>> import json ->>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) ->>> producer.send('fizzbuzz', {'foo': 'bar'}) + # Serialize string keys + producer = KafkaProducer(key_serializer=str.encode) + producer.send('flipflap', key='ping', value=b'1234') ->>> # Serialize string keys ->>> producer = KafkaProducer(key_serializer=str.encode) ->>> producer.send('flipflap', key='ping', value=b'1234') +.. code:: python ->>> # Compress messages ->>> producer = KafkaProducer(compression_type='gzip') ->>> for i in range(1000): -... producer.send('foobar', b'msg %d' % i) + # Compress messages + producer = KafkaProducer(compression_type='gzip') + for i in range(1000): + producer.send('foobar', b'msg %d' % i) Thread safety From 88763da301f72759911ec4c0e4dc8b4e9f83e124 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 00:14:12 -0500 Subject: [PATCH 14/19] Fix crc32c's __main__ for Python 3 (#142) * Fix crc32c's __main__ for Python 3 * Remove TODO from _crc32c.py --------- Co-authored-by: Yonatan Goldschmidt --- kafka/record/_crc32c.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py index 9b51ad8a9..6642b5bbe 100644 --- a/kafka/record/_crc32c.py +++ b/kafka/record/_crc32c.py @@ -139,7 +139,5 @@ def crc(data): if __name__ == "__main__": import sys - # TODO remove the pylint disable once pylint fixes - # https://github.com/PyCQA/pylint/issues/2571 - data = sys.stdin.read() # pylint: disable=assignment-from-no-return + data = sys.stdin.buffer.read() # pylint: disable=assignment-from-no-return print(hex(crc(data))) From b1a4c53cfc426118cca491a552861e9b07592629 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 00:59:32 -0500 Subject: [PATCH 15/19] Strip trailing dot off hostname. (#133) Co-authored-by: Dave Voutila --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1efb8a0a1..1f3bc2006 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -496,7 +496,7 @@ def _wrap_ssl(self): try: self._sock = self._ssl_context.wrap_socket( self._sock, - server_hostname=self.host, + server_hostname=self.host.rstrip("."), do_handshake_on_connect=False) except ssl.SSLError as e: log.exception('%s: Failed to wrap socket in SSLContext!', self) From 18eaa2d29e7ba3d0f261e620397712b5d67c3a94 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 01:47:35 -0500 Subject: [PATCH 16/19] Handle OSError to properly recycle SSL connection, fix infinite loop (#155) * handling OSError * better error output * removed traceback logging --------- Co-authored-by: Alexander Sibiryakov --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1f3bc2006..80f17009c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -510,7 +510,7 @@ def _try_handshake(self): # old ssl in python2.6 will swallow all SSLErrors here... except (SSLWantReadError, SSLWantWriteError): pass - except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError): + except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError, ssl.SSLError, OSError) as e: log.warning('SSL connection closed by server during handshake.') self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake')) # Other SSLErrors will be raised to user From 54cbd63a275e781c9db98b57d6daa36a15eaa0ff Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 13:05:12 -0400 Subject: [PATCH 17/19] client_async: Allow throwing an exception upon socket error during (#134) wakeup When wakeup() is called, we sometime notice that we get an endless prints: "Unable to send to wakeup socket!". Those prints are spamming the logs. This commit aims to address it by allowing restating the application via an intentional exception raise. This behavior is configurable and its default is backward compatible. Signed-off-by: shimon-armis Co-authored-by: shimon-armis --- kafka/client_async.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 530a1f441..3076c4ba0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,8 @@ class KafkaClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + raise_upon_socket_err_during_wakeup (bool): If set to True, raise an exception + upon socket error during wakeup(). Default: False """ DEFAULT_CONFIG = { @@ -192,7 +194,8 @@ class KafkaClient(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'raise_upon_socket_err_during_wakeup': False } def __init__(self, **configs): @@ -243,6 +246,8 @@ def __init__(self, **configs): check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 self.config['api_version'] = self.check_version(timeout=check_timeout) + self._raise_upon_socket_err_during_wakeup = self.config['raise_upon_socket_err_during_wakeup'] + def _can_bootstrap(self): effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts backoff_factor = 2 ** effective_failures @@ -936,8 +941,10 @@ def wakeup(self): except socket.timeout: log.warning('Timeout to send to wakeup socket!') raise Errors.KafkaTimeoutError() - except socket.error: + except socket.error as e: log.warning('Unable to send to wakeup socket!') + if self._raise_upon_socket_err_during_wakeup: + raise e def _clear_wake_fd(self): # reading from wake socket should only happen in a single thread From eb6fd9b46a1b4fa79a2e52da60ac809d0b0ddb8f Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 12 Mar 2024 17:05:12 -0400 Subject: [PATCH 18/19] Log connection errors at ERROR level (#139) Co-authored-by: drewdogg --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 80f17009c..d04acce3e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -916,7 +916,7 @@ def close(self, error=None): with self._lock: if self.state is ConnectionStates.DISCONNECTED: return - log.info('%s: Closing connection. %s', self, error or '') + log.log(logging.ERROR if error else logging.INFO, '%s: Closing connection. %s', self, error or '') self._update_reconnect_backoff() self._sasl_auth_future = None self._protocol = KafkaProtocol( From ebcfcb1598c006e9499287348a660d12e5073239 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 17 Mar 2024 15:09:17 -0400 Subject: [PATCH 19/19] Add test_msk.py by @mattoberle --- test/test_msk.py | 68 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 test/test_msk.py diff --git a/test/test_msk.py b/test/test_msk.py new file mode 100644 index 000000000..4d06d4441 --- /dev/null +++ b/test/test_msk.py @@ -0,0 +1,68 @@ +import datetime +import json + +from kafka.msk import AwsMskIamClient + +try: + from unittest import mock +except ImportError: + import mock + + +def client_factory(token=None): + now = datetime.datetime.utcfromtimestamp(1629321911) + with mock.patch('kafka.msk.datetime') as mock_dt: + mock_dt.datetime.utcnow = mock.Mock(return_value=now) + return AwsMskIamClient( + host='localhost', + access_key='XXXXXXXXXXXXXXXXXXXX', + secret_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', + region='us-east-1', + token=token, + ) + + +def test_aws_msk_iam_client_permanent_credentials(): + client = client_factory(token=None) + msg = client.first_message() + assert msg + assert isinstance(msg, bytes) + actual = json.loads(msg) + + expected = { + 'version': '2020_10_22', + 'host': 'localhost', + 'user-agent': 'kafka-python', + 'action': 'kafka-cluster:Connect', + 'x-amz-algorithm': 'AWS4-HMAC-SHA256', + 'x-amz-credential': 'XXXXXXXXXXXXXXXXXXXX/20210818/us-east-1/kafka-cluster/aws4_request', + 'x-amz-date': '20210818T212511Z', + 'x-amz-signedheaders': 'host', + 'x-amz-expires': '900', + 'x-amz-signature': '0fa42ae3d5693777942a7a4028b564f0b372bafa2f71c1a19ad60680e6cb994b', + } + assert actual == expected + + +def test_aws_msk_iam_client_temporary_credentials(): + client = client_factory(token='XXXXX') + msg = client.first_message() + assert msg + assert isinstance(msg, bytes) + actual = json.loads(msg) + + expected = { + 'version': '2020_10_22', + 'host': 'localhost', + 'user-agent': 'kafka-python', + 'action': 'kafka-cluster:Connect', + 'x-amz-algorithm': 'AWS4-HMAC-SHA256', + 'x-amz-credential': 'XXXXXXXXXXXXXXXXXXXX/20210818/us-east-1/kafka-cluster/aws4_request', + 'x-amz-date': '20210818T212511Z', + 'x-amz-signedheaders': 'host', + 'x-amz-expires': '900', + 'x-amz-signature': 'b0619c50b7ecb4a7f6f92bd5f733770df5710e97b25146f97015c0b1db783b05', + 'x-amz-security-token': 'XXXXX', + } + assert actual == expected +