From d2d1d1b0c3cdba2312be01b1ced31023524f0279 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 13:25:03 -0800 Subject: [PATCH 1/6] Rename SaslHandShake -> SaslHandshake; move to separate module --- kafka/conn.py | 11 +++++---- kafka/protocol/admin.py | 35 ---------------------------- kafka/protocol/sasl_handshake.py | 39 ++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 40 deletions(-) create mode 100644 kafka/protocol/sasl_handshake.py diff --git a/kafka/conn.py b/kafka/conn.py index fd6943171..83c731728 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -24,16 +24,17 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.oauth.abstract import AbstractTokenProvider -from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest, SaslHandShakeRequest +from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest from kafka.protocol.api_versions import ApiVersionsRequest from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.commit import OffsetFetchRequest +from kafka.protocol.fetch import FetchRequest from kafka.protocol.find_coordinator import FindCoordinatorRequest from kafka.protocol.list_offsets import ListOffsetsRequest -from kafka.protocol.produce import ProduceRequest from kafka.protocol.metadata import MetadataRequest -from kafka.protocol.fetch import FetchRequest from kafka.protocol.parser import KafkaProtocol +from kafka.protocol.produce import ProduceRequest +from kafka.protocol.sasl_handshake import SaslHandshakeRequest from kafka.protocol.types import Int32, Int8 from kafka.scram import ScramClient from kafka.version import __version__ @@ -624,8 +625,8 @@ def _try_authenticate(self): assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10, 0) if self._sasl_auth_future is None: - # Build a SaslHandShakeRequest message - request = SaslHandShakeRequest[0](self.config['sasl_mechanism']) + # Build a SaslHandshakeRequest message + request = SaslHandshakeRequest[0](self.config['sasl_mechanism']) future = Future() sasl_response = self._send(request, blocking=True) sasl_response.add_callback(self._handle_sasl_handshake_response, future) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index c237ef7e0..058325cb1 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -346,41 +346,6 @@ class DescribeGroupsRequest_v3(Request): ] -class SaslHandShakeResponse_v0(Response): - API_KEY = 17 - API_VERSION = 0 - SCHEMA = Schema( - ('error_code', Int16), - ('enabled_mechanisms', Array(String('utf-8'))) - ) - - -class SaslHandShakeResponse_v1(Response): - API_KEY = 17 - API_VERSION = 1 - SCHEMA = SaslHandShakeResponse_v0.SCHEMA - - -class SaslHandShakeRequest_v0(Request): - API_KEY = 17 - API_VERSION = 0 - RESPONSE_TYPE = SaslHandShakeResponse_v0 - SCHEMA = Schema( - ('mechanism', String('utf-8')) - ) - - -class SaslHandShakeRequest_v1(Request): - API_KEY = 17 - API_VERSION = 1 - RESPONSE_TYPE = SaslHandShakeResponse_v1 - SCHEMA = SaslHandShakeRequest_v0.SCHEMA - - -SaslHandShakeRequest = [SaslHandShakeRequest_v0, SaslHandShakeRequest_v1] -SaslHandShakeResponse = [SaslHandShakeResponse_v0, SaslHandShakeResponse_v1] - - class DescribeAclsResponse_v0(Response): API_KEY = 29 API_VERSION = 0 diff --git a/kafka/protocol/sasl_handshake.py b/kafka/protocol/sasl_handshake.py new file mode 100644 index 000000000..e91c856ca --- /dev/null +++ b/kafka/protocol/sasl_handshake.py @@ -0,0 +1,39 @@ +from __future__ import absolute_import + +from kafka.protocol.api import Request, Response +from kafka.protocol.types import Array, Int16, Schema, String + + +class SaslHandshakeResponse_v0(Response): + API_KEY = 17 + API_VERSION = 0 + SCHEMA = Schema( + ('error_code', Int16), + ('enabled_mechanisms', Array(String('utf-8'))) + ) + + +class SaslHandshakeResponse_v1(Response): + API_KEY = 17 + API_VERSION = 1 + SCHEMA = SaslHandshakeResponse_v0.SCHEMA + + +class SaslHandshakeRequest_v0(Request): + API_KEY = 17 + API_VERSION = 0 + RESPONSE_TYPE = SaslHandshakeResponse_v0 + SCHEMA = Schema( + ('mechanism', String('utf-8')) + ) + + +class SaslHandshakeRequest_v1(Request): + API_KEY = 17 + API_VERSION = 1 + RESPONSE_TYPE = SaslHandshakeResponse_v1 + SCHEMA = SaslHandshakeRequest_v0.SCHEMA + + +SaslHandshakeRequest = [SaslHandshakeRequest_v0, SaslHandshakeRequest_v1] +SaslHandshakeResponse = [SaslHandshakeResponse_v0, SaslHandshakeResponse_v1] From 60b1d164ba60cce0b1fc95b62add90b70836f25a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 13:32:05 -0800 Subject: [PATCH 2/6] SaslAuthenticate v0/v1 protocol defs --- kafka/protocol/sasl_authenticate.py | 42 +++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 kafka/protocol/sasl_authenticate.py diff --git a/kafka/protocol/sasl_authenticate.py b/kafka/protocol/sasl_authenticate.py new file mode 100644 index 000000000..528bb3cc6 --- /dev/null +++ b/kafka/protocol/sasl_authenticate.py @@ -0,0 +1,42 @@ +from __future__ import absolute_import + +from kafka.protocol.api import Request, Response +from kafka.protocol.types import Array, Bytes, Int16, Int64, Schema, String + + +class SaslAuthenticateResponse_v0(Response): + API_KEY = 36 + API_VERSION = 0 + SCHEMA = Schema( + ('error_code', Int16), + ('error_message', String('utf-8')), + ('auth_bytes', Bytes)) + + +class SaslAuthenticateResponse_v1(Response): + API_KEY = 36 + API_VERSION = 1 + SCHEMA = Schema( + ('error_code', Int16), + ('error_message', String('utf-8')), + ('auth_bytes', Bytes), + ('session_lifetime_ms', Int64)) + + +class SaslAuthenticateRequest_v0(Request): + API_KEY = 36 + API_VERSION = 0 + RESPONSE_TYPE = SaslAuthenticateResponse_v0 + SCHEMA = Schema( + ('auth_bytes', Bytes)) + + +class SaslAuthenticateRequest_v1(Request): + API_KEY = 36 + API_VERSION = 1 + RESPONSE_TYPE = SaslAuthenticateResponse_v1 + SCHEMA = SaslAuthenticateRequest_v0.SCHEMA + + +SaslAuthenticateRequest = [SaslAuthenticateRequest_v0, SaslAuthenticateRequest_v1] +SaslAuthenticateResponse = [SaslAuthenticateResponse_v0, SaslAuthenticateResponse_v1] From f786a6916d4110bea6023d13d1fa0d25e92ceed7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 22:45:45 -0800 Subject: [PATCH 3/6] Modular sasl mechanisms (wip) --- kafka/conn.py | 326 +++++++++----------------------------- kafka/sasl/__init__.py | 26 +++ kafka/sasl/abc.py | 27 ++++ kafka/sasl/gssapi.py | 73 +++++++++ kafka/sasl/oauth.py | 40 +++++ kafka/sasl/plain.py | 36 +++++ kafka/{ => sasl}/scram.py | 75 +++++++-- 7 files changed, 333 insertions(+), 270 deletions(-) create mode 100644 kafka/sasl/__init__.py create mode 100644 kafka/sasl/abc.py create mode 100644 kafka/sasl/gssapi.py create mode 100644 kafka/sasl/oauth.py create mode 100644 kafka/sasl/plain.py rename kafka/{ => sasl}/scram.py (52%) diff --git a/kafka/conn.py b/kafka/conn.py index 83c731728..73fe17874 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -34,9 +34,10 @@ from kafka.protocol.metadata import MetadataRequest from kafka.protocol.parser import KafkaProtocol from kafka.protocol.produce import ProduceRequest +from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest from kafka.protocol.sasl_handshake import SaslHandshakeRequest from kafka.protocol.types import Int32, Int8 -from kafka.scram import ScramClient +from kafka.sasl import get_sasl_mechanism from kafka.version import __version__ @@ -49,10 +50,6 @@ DEFAULT_KAFKA_PORT = 9092 -SASL_QOP_AUTH = 1 -SASL_QOP_AUTH_INT = 2 -SASL_QOP_AUTH_CONF = 4 - try: import ssl ssl_available = True @@ -78,15 +75,6 @@ class SSLWantReadError(Exception): class SSLWantWriteError(Exception): pass -# needed for SASL_GSSAPI authentication: -try: - import gssapi - from gssapi.raw.misc import GSSError -except (ImportError, OSError): - #no gssapi available, will disable gssapi mechanism - gssapi = None - GSSError = None - AFI_NAMES = { socket.AF_UNSPEC: "unspecified", @@ -233,7 +221,6 @@ class BrokerConnection(object): 'sasl_oauth_token_provider': None } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") VERSION_CHECKS = ( ((0, 9), ListGroupsRequest[0]()), ((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')), @@ -272,26 +259,13 @@ def __init__(self, host, port, afi, **configs): assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, ( 'security_protocol must be in ' + ', '.join(self.SECURITY_PROTOCOLS)) + self._sasl_mechanism = None if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): assert ssl_available, "Python wasn't built with SSL support" if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): - assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, ( - 'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS)) - if self.config['sasl_mechanism'] in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): - assert self.config['sasl_plain_username'] is not None, ( - 'sasl_plain_username required for PLAIN or SCRAM sasl' - ) - assert self.config['sasl_plain_password'] is not None, ( - 'sasl_plain_password required for PLAIN or SCRAM sasl' - ) - if self.config['sasl_mechanism'] == 'GSSAPI': - assert gssapi is not None, 'GSSAPI lib not available' - assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' - if self.config['sasl_mechanism'] == 'OAUTHBEARER': - token_provider = self.config['sasl_oauth_token_provider'] - assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' - assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()' + self._sasl_mechanism = get_sasl_mechanism(self.config['sasl_mechanism'])(**self.config) + # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to @@ -621,12 +595,22 @@ def _handle_check_version_failure(self, future, ex): self._check_version_idx += 1 # after failure connection is closed, so state should already be DISCONNECTED - def _try_authenticate(self): - assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10, 0) + def _sasl_handshake_version(self): + if self._api_versions is None: + raise RuntimeError('_api_versions not set') + if SaslHandshakeRequest[0].API_KEY not in self._api_versions: + raise Errors.UnsupportedVersionError('SaslHandshake') + # Build a SaslHandshakeRequest message + min_version, max_version = self._api_versions[SaslHandshakeRequest[0].API_KEY] + if min_version > 1: + raise Errors.UnsupportedVersionError('SaslHandshake %s' % min_version) + return min(max_version, 1) + + def _try_authenticate(self): if self._sasl_auth_future is None: - # Build a SaslHandshakeRequest message - request = SaslHandshakeRequest[0](self.config['sasl_mechanism']) + version = self._sasl_handshake_version() + request = SaslHandshakeRequest[version](self.config['sasl_mechanism']) future = Future() sasl_response = self._send(request, blocking=True) sasl_response.add_callback(self._handle_sasl_handshake_response, future) @@ -657,19 +641,9 @@ def _handle_sasl_handshake_response(self, future, response): Errors.UnsupportedSaslMechanismError( 'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s' % (self.config['sasl_mechanism'], response.enabled_mechanisms))) - elif self.config['sasl_mechanism'] == 'PLAIN': - self._try_authenticate_plain(future) - elif self.config['sasl_mechanism'] == 'GSSAPI': - self._try_authenticate_gssapi(future) - elif self.config['sasl_mechanism'] == 'OAUTHBEARER': - self._try_authenticate_oauth(future) - elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"): - self._try_authenticate_scram(future) else: - future.failure( - Errors.UnsupportedSaslMechanismError( - 'kafka-python does not support SASL mechanism %s' % - self.config['sasl_mechanism'])) + self._sasl_authenticate(future) + assert future.is_done, 'SASL future not complete after mechanism processing!' if future.failed(): self.close(error=future.exception) @@ -728,224 +702,66 @@ def _recv_bytes_blocking(self, n): finally: self._sock.settimeout(0.0) - def _try_authenticate_plain(self, future): - if self.config['security_protocol'] == 'SASL_PLAINTEXT': - log.warning('%s: Sending username and password in the clear', self) - - data = b'' - # Send PLAIN credentials per RFC-4616 - msg = bytes('\0'.join([self.config['sasl_plain_username'], - self.config['sasl_plain_username'], - self.config['sasl_plain_password']]).encode('utf-8')) - size = Int32.encode(len(msg)) - - err = None - close = False - with self._lock: - if not self._can_send_recv(): - err = Errors.NodeNotReadyError(str(self)) - close = False - else: - try: - self._send_bytes_blocking(size + msg) - - # The server will send a zero sized message (that is Int32(0)) on success. - # The connection is closed on failure - data = self._recv_bytes_blocking(4) - - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error receiving reply from server", self) - err = Errors.KafkaConnectionError("%s: %s" % (self, e)) - close = True - - if err is not None: - if close: + def _send_sasl_authenticate(self, sasl_auth_bytes): + version = self._sasl_handshake_version() + if version == 1: + request = SaslAuthenticateRequest[0](sasl_auth_bytes) + self._send(request, blocking=True) + else: + try: + self._send_bytes_blocking(Int32.encode(len(sasl_auth_bytes)) + sasl_auth_bytes) + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error sending sasl auth bytes to server", self) + err = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=err) - return future.failure(err) - - if data != b'\x00\x00\x00\x00': - error = Errors.AuthenticationFailedError('Unrecognized response during authentication') - return future.failure(error) - - log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) - return future.success(True) - - def _try_authenticate_scram(self, future): - if self.config['security_protocol'] == 'SASL_PLAINTEXT': - log.warning('%s: Exchanging credentials in the clear', self) - - scram_client = ScramClient( - self.config['sasl_plain_username'], self.config['sasl_plain_password'], self.config['sasl_mechanism'] - ) - - err = None - close = False - with self._lock: - if not self._can_send_recv(): - err = Errors.NodeNotReadyError(str(self)) - close = False - else: - try: - client_first = scram_client.first_message().encode('utf-8') - size = Int32.encode(len(client_first)) - self._send_bytes_blocking(size + client_first) - - (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) - server_first = self._recv_bytes_blocking(data_len).decode('utf-8') - scram_client.process_server_first_message(server_first) - - client_final = scram_client.final_message().encode('utf-8') - size = Int32.encode(len(client_final)) - self._send_bytes_blocking(size + client_final) - (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) - server_final = self._recv_bytes_blocking(data_len).decode('utf-8') - scram_client.process_server_final_message(server_final) + def _recv_sasl_authenticate(self): + version = self._sasl_handshake_version() + # GSSAPI mechanism does not get a final recv in old non-framed mode + if version == 0 and self._sasl_mechanism.is_done(): + return b'' - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error receiving reply from server", self) - err = Errors.KafkaConnectionError("%s: %s" % (self, e)) - close = True + try: + data = self._recv_bytes_blocking(4) + nbytes = Int32.decode(io.BytesIO(data)) + data += self._recv_bytes_blocking(nbytes) + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error receiving sasl auth bytes from server", self) + err = Errors.KafkaConnectionError("%s: %s" % (self, e)) + self.close(error=err) + return - if err is not None: - if close: - self.close(error=err) - return future.failure(err) - - log.info( - '%s: Authenticated as %s via %s', self, self.config['sasl_plain_username'], self.config['sasl_mechanism'] - ) - return future.success(True) - - def _try_authenticate_gssapi(self, future): - kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host - auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name - gssapi_name = gssapi.Name( - auth_id, - name_type=gssapi.NameType.hostbased_service - ).canonicalize(gssapi.MechType.kerberos) - log.debug('%s: GSSAPI name: %s', self, gssapi_name) + if version == 1: + ((_correlation_id, response),) = self._protocol.receive_bytes(data) + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + log.error("%s: SaslAuthenticate error: %s (%s)", + self, error_type.__name__, response.error_message) + self.close(error=error_type()) + return + return response.auth_bytes + else: + # unframed bytes w/ SaslHandhake v0 + return data[4:] - err = None - close = False - with self._lock: + def _sasl_authenticate(self, future): + while not self._sasl_mechanism.is_done(): + send_token = self._sasl_mechanism.auth_bytes() + self._send_sasl_authenticate(send_token) if not self._can_send_recv(): - err = Errors.NodeNotReadyError(str(self)) - close = False - else: - # Establish security context and negotiate protection level - # For reference RFC 2222, section 7.2.1 - try: - # Exchange tokens until authentication either succeeds or fails - client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') - received_token = None - while not client_ctx.complete: - # calculate an output token from kafka token (or None if first iteration) - output_token = client_ctx.step(received_token) - - # pass output token to kafka, or send empty response if the security - # context is complete (output token is None in that case) - if output_token is None: - self._send_bytes_blocking(Int32.encode(0)) - else: - msg = output_token - size = Int32.encode(len(msg)) - self._send_bytes_blocking(size + msg) - - # The server will send a token back. Processing of this token either - # establishes a security context, or it needs further token exchange. - # The gssapi will be able to identify the needed next step. - # The connection is closed on failure. - header = self._recv_bytes_blocking(4) - (token_size,) = struct.unpack('>i', header) - received_token = self._recv_bytes_blocking(token_size) - - # Process the security layer negotiation token, sent by the server - # once the security context is established. - - # unwraps message containing supported protection levels and msg size - msg = client_ctx.unwrap(received_token).message - # Kafka currently doesn't support integrity or confidentiality security layers, so we - # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed - # by the server - msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:] - # add authorization identity to the response, GSS-wrap and send it - msg = client_ctx.wrap(msg + auth_id.encode(), False).message - size = Int32.encode(len(msg)) - self._send_bytes_blocking(size + msg) - - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error receiving reply from server", self) - err = Errors.KafkaConnectionError("%s: %s" % (self, e)) - close = True - except Exception as e: - err = e - close = True - - if err is not None: - if close: - self.close(error=err) - return future.failure(err) - - log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) - return future.success(True) + return future.failure(Errors.KafkaConnectionError("%s: Connection failure during Sasl Authenticate" % self)) - def _try_authenticate_oauth(self, future): - data = b'' - - msg = bytes(self._build_oauth_client_request().encode("utf-8")) - size = Int32.encode(len(msg)) - - err = None - close = False - with self._lock: - if not self._can_send_recv(): - err = Errors.NodeNotReadyError(str(self)) - close = False + recv_token = self._recv_sasl_authenticate() + if recv_token is None: + return future.failure(Errors.KafkaConnectionError("%s: Connection failure during Sasl Authenticate" % self)) else: - try: - # Send SASL OAuthBearer request with OAuth token - self._send_bytes_blocking(size + msg) - - # The server will send a zero sized message (that is Int32(0)) on success. - # The connection is closed on failure - data = self._recv_bytes_blocking(4) - - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error receiving reply from server", self) - err = Errors.KafkaConnectionError("%s: %s" % (self, e)) - close = True - - if err is not None: - if close: - self.close(error=err) - return future.failure(err) - - if data != b'\x00\x00\x00\x00': - error = Errors.AuthenticationFailedError('Unrecognized response during authentication') - return future.failure(error) - - log.info('%s: Authenticated via OAuth', self) - return future.success(True) - - def _build_oauth_client_request(self): - token_provider = self.config['sasl_oauth_token_provider'] - return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions()) - - def _token_extensions(self): - """ - Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER - initial request. - """ - token_provider = self.config['sasl_oauth_token_provider'] + self._sasl_mechanism.receive(recv_token) - # Only run if the #extensions() method is implemented by the clients Token Provider class - # Builds up a string separated by \x01 via a dict of key value pairs - if callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0: - msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()]) - return "\x01" + msg + if self._sasl_mechanism.is_authenticated(): + log.info('%s: Authenticated via %s', self, self.config['sasl_mechanism']) + return future.success(True) else: - return "" + return future.failure(Errors.AuthenticationFailedError('Failed to authenticate via SASL %s' % self.config['sasl_mechanism'])) def blacked_out(self): """ diff --git a/kafka/sasl/__init__.py b/kafka/sasl/__init__.py new file mode 100644 index 000000000..e36d1dfbd --- /dev/null +++ b/kafka/sasl/__init__.py @@ -0,0 +1,26 @@ +from __future__ import absolute_import + +from kafka.sasl.gssapi import SaslMechanismGSSAPI +from kafka.sasl.oauth import SaslMechanismOAuth +from kafka.sasl.plain import SaslMechanismPlain +from kafka.sasl.scram import SaslMechanismScram + + +SASL_MECHANISMS = {} + + +def register_sasl_mechanism(name, klass, overwrite=False): + if not overwrite and name in SASL_MECHANISMS: + raise ValueError('Sasl mechanism %s already defined!' % name) + SASL_MECHANISMS[name] = klass + + +def get_sasl_mechanism(name): + return SASL_MECHANISMS[name] + + +register_sasl_mechanism('GSSAPI', SaslMechanismGSSAPI) +register_sasl_mechanism('OAUTHBEARER', SaslMechanismOAuth) +register_sasl_mechanism('PLAIN', SaslMechanismPlain) +register_sasl_mechanism('SCRAM-SHA-256', SaslMechanismScram) +register_sasl_mechanism('SCRAM-SHA-512', SaslMechanismScram) diff --git a/kafka/sasl/abc.py b/kafka/sasl/abc.py new file mode 100644 index 000000000..7baef3b78 --- /dev/null +++ b/kafka/sasl/abc.py @@ -0,0 +1,27 @@ +from __future__ import absolute_import + +import abc + + +class SaslMechanism(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __init__(self, **config): + pass + + @abc.abstractmethod + def auth_bytes(self): + pass + + @abc.abstractmethod + def receive(self, auth_bytes): + pass + + @abc.abstractmethod + def is_done(self): + pass + + @abc.abstractmethod + def is_authenticated(self): + pass diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py new file mode 100644 index 000000000..b40c37535 --- /dev/null +++ b/kafka/sasl/gssapi.py @@ -0,0 +1,73 @@ +from __future__ import absolute_import + +# needed for SASL_GSSAPI authentication: +try: + import gssapi + from gssapi.raw.misc import GSSError +except (ImportError, OSError): + #no gssapi available, will disable gssapi mechanism + gssapi = None + GSSError = None + +from kafka.sasl.abc import SaslMechanism + + +class SaslMechanismGSSAPI(SaslMechanism): + # Establish security context and negotiate protection level + # For reference RFC 2222, section 7.2.1 + + SASL_QOP_AUTH = 1 + SASL_QOP_AUTH_INT = 2 + SASL_QOP_AUTH_CONF = 4 + + def __init__(self, **config): + assert gssapi is not None, 'GSSAPI lib not available' + assert config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' + self._is_done = False + self._is_authenticated = False + self.kerberos_damin_name = config['sasl_kerberos_domain_name'] or config['host'] + self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name + self.gssapi_name = gssapi.Name(auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos) + self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate') + self._next_token = self._client_ctx.step(None) + + def auth_bytes(self): + # GSSAPI Auth does not have a final broker->client message + # so mark is_done after the final auth_bytes are provided + # in practice we'll still receive a response when using SaslAuthenticate + # but not when using the prior unframed approach. + if self._client_ctx.complete: + self._is_done = True + self._is_authenticated = True + return self._next_token or b'' + + def receive(self, auth_bytes): + if not self._client_ctx.complete: + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + self._next_token = self._client_ctx.step(auth_bytes) + elif self._is_done: + # The final step of gssapi is send, so we do not expect any additional bytes + # however, allow an empty message to support SaslAuthenticate response + if auth_bytes != b'': + raise ValueError("Unexpected receive auth_bytes after sasl/gssapi completion") + else: + # unwraps message containing supported protection levels and msg size + msg = client_ctx.unwrap(received_token).message + # Kafka currently doesn't support integrity or confidentiality security layers, so we + # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed + # by the server + message_parts = [ + Int8.encode(self.SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))), + msg[:1], + self.auth_id.encode(), + ] + # add authorization identity to the response, and GSS-wrap + self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message + + def is_done(self): + return self._is_done + + def is_authenticated(self): + return self._is_authenticated diff --git a/kafka/sasl/oauth.py b/kafka/sasl/oauth.py new file mode 100644 index 000000000..d4f8df983 --- /dev/null +++ b/kafka/sasl/oauth.py @@ -0,0 +1,40 @@ +from __future__ import absolute_import + +from kafka.sasl.abc import SaslMechanism + + +class SaslMechanismOAuth(SaslMechanism): + + def __init__(self, **config): + self.token_provider = config['sasl_oauth_token_provider'] + assert self.token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' + assert callable(getattr(self.token_provider, 'token', None)), 'sasl_oauth_token_provider must implement method #token()' + self._is_done = False + self._is_authenticated = False + + def auth_bytes(self): + token = self.token_provider.token() + extensions = self._token_extensions() + return "n,,\x01auth=Bearer {}{}\x01\x01".format(token, extensions).encode('utf-8') + + def receive(self, auth_bytes): + if auth_bytes != b'': + self._is_authenticated = False + self._is_done = True + + def is_done(self): + return self._is_done + + def is_authenticated(self): + return self._is_authenticated + + def _token_extensions(self): + """ + Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER + initial request. + """ + # Only run if the #extensions() method is implemented by the clients Token Provider class + # Builds up a string separated by \x01 via a dict of key value pairs + extensions = getattr(self.token_provider, 'extensions', lambda: [])() + msg = '\x01'.join(['{}={}'.format(k, v) for k, v in extensions.items()]) + return '\x01' + msg if msg else '' diff --git a/kafka/sasl/plain.py b/kafka/sasl/plain.py new file mode 100644 index 000000000..f2bae6751 --- /dev/null +++ b/kafka/sasl/plain.py @@ -0,0 +1,36 @@ +from __future__ import absolute_import + +import logging + +from kafka.sasl.abc import SaslMechanism + + +log = logging.getLogger(__name__) + + +class SaslMechanismPlain(SaslMechanism): + + def __init__(self, **config): + if config['security_protocol'] == 'SASL_PLAINTEXT': + log.warning('Sending username and password in the clear') + assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl' + assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' + + self.username = config['sasl_plain_username'] + self.password = config['sasl_plain_password'] + self._is_done = False + self._is_authenticated = False + + def auth_bytes(self): + # Send PLAIN credentials per RFC-4616 + return bytes('\0'.join([self.username, self.username, self.password]).encode('utf-8')) + + def receive(self, auth_bytes): + self._is_done = True + self._is_authenticated = auth_bytes == b'' + + def is_done(self): + return self._is_done + + def is_authenticated(self): + return self._is_authenticated diff --git a/kafka/scram.py b/kafka/sasl/scram.py similarity index 52% rename from kafka/scram.py rename to kafka/sasl/scram.py index 7f003750c..0bae8c928 100644 --- a/kafka/scram.py +++ b/kafka/sasl/scram.py @@ -3,11 +3,17 @@ import base64 import hashlib import hmac +import logging import uuid + +from kafka.sasl.abc import SaslMechanism from kafka.vendor import six +log = logging.getLogger(__name__) + + if six.PY2: def xor_bytes(left, right): return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right)) @@ -16,6 +22,47 @@ def xor_bytes(left, right): return bytes(lb ^ rb for lb, rb in zip(left, right)) +class SaslMechanismScram(SaslMechanism): + + def __init__(self, **config): + assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for SCRAM sasl' + assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for SCRAM sasl' + if config['security_protocol'] == 'SASL_PLAINTEXT': + log.warning('Exchanging credentials in the clear during Sasl Authentication') + + self._scram_client = ScramClient( + config['sasl_plain_username'], + config['sasl_plain_password'], + config['sasl_mechanism'] + ) + self._state = 0 + + def auth_bytes(self): + if self._state == 0: + return self._scram_client.first_message() + elif self._state == 1: + return self._scram_client.final_message() + else: + raise ValueError('No auth_bytes for state: %s' % self._state) + + def receive(self, auth_bytes): + if self._state == 0: + self._scram_client.process_server_first_message(auth_bytes) + elif self._state == 1: + self._scram_client.process_server_final_message(auth_bytes) + else: + raise ValueError('Cannot receive bytes in state: %s' % self._state) + self._state += 1 + return self.is_done() + + def is_done(self): + return self._state == 2 + + def is_authenticated(self): + # receive raises if authentication fails...? + return self._state == 2 + + class ScramClient: MECHANISMS = { 'SCRAM-SHA-256': hashlib.sha256, @@ -23,10 +70,10 @@ class ScramClient: } def __init__(self, user, password, mechanism): - self.nonce = str(uuid.uuid4()).replace('-', '') - self.auth_message = '' + self.nonce = str(uuid.uuid4()).replace('-', '').encode('utf-8') + self.auth_message = b'' self.salted_password = None - self.user = user + self.user = user.encode('utf-8') self.password = password.encode('utf-8') self.hashfunc = self.MECHANISMS[mechanism] self.hashname = ''.join(mechanism.lower().split('-')[1:3]) @@ -38,18 +85,18 @@ def __init__(self, user, password, mechanism): self.server_signature = None def first_message(self): - client_first_bare = 'n={},r={}'.format(self.user, self.nonce) + client_first_bare = b'n=' + self.user + b',r=' + self.nonce self.auth_message += client_first_bare - return 'n,,' + client_first_bare + return b'n,,' + client_first_bare def process_server_first_message(self, server_first_message): - self.auth_message += ',' + server_first_message - params = dict(pair.split('=', 1) for pair in server_first_message.split(',')) - server_nonce = params['r'] + self.auth_message += b',' + server_first_message + params = dict(pair.split('=', 1) for pair in server_first_message.decode('utf-8').split(',')) + server_nonce = params['r'].encode('utf-8') if not server_nonce.startswith(self.nonce): raise ValueError("Server nonce, did not start with client nonce!") self.nonce = server_nonce - self.auth_message += ',c=biws,r=' + self.nonce + self.auth_message += b',c=biws,r=' + self.nonce salt = base64.b64decode(params['s'].encode('utf-8')) iterations = int(params['i']) @@ -57,10 +104,10 @@ def process_server_first_message(self, server_first_message): self.client_key = self.hmac(self.salted_password, b'Client Key') self.stored_key = self.hashfunc(self.client_key).digest() - self.client_signature = self.hmac(self.stored_key, self.auth_message.encode('utf-8')) + self.client_signature = self.hmac(self.stored_key, self.auth_message) self.client_proof = xor_bytes(self.client_key, self.client_signature) self.server_key = self.hmac(self.salted_password, b'Server Key') - self.server_signature = self.hmac(self.server_key, self.auth_message.encode('utf-8')) + self.server_signature = self.hmac(self.server_key, self.auth_message) def hmac(self, key, msg): return hmac.new(key, msg, digestmod=self.hashfunc).digest() @@ -71,11 +118,9 @@ def create_salted_password(self, salt, iterations): ) def final_message(self): - return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode('utf-8')) + return b'c=biws,r=' + self.nonce + b',p=' + base64.b64encode(self.client_proof) def process_server_final_message(self, server_final_message): - params = dict(pair.split('=', 1) for pair in server_final_message.split(',')) + params = dict(pair.split('=', 1) for pair in server_final_message.decode('utf-8').split(',')) if self.server_signature != base64.b64decode(params['v'].encode('utf-8')): raise ValueError("Server sent wrong signature!") - - From c6419778812ac84ad08b9f4db6dc577a74ed1541 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 6 Mar 2025 07:17:59 -0800 Subject: [PATCH 4/6] pop from in flight requests --- kafka/conn.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 73fe17874..173930644 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -732,7 +732,13 @@ def _recv_sasl_authenticate(self): return if version == 1: - ((_correlation_id, response),) = self._protocol.receive_bytes(data) + ((correlation_id, response),) = self._protocol.receive_bytes(data) + (future, timestamp, _timeout) = self.in_flight_requests.pop(correlation_id) + latency_ms = (time.time() - timestamp) * 1000 + if self._sensors: + self._sensors.request_time.record(latency_ms) + log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response) + error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: log.error("%s: SaslAuthenticate error: %s (%s)", From 331774fb43fb7465195334e0483128312b01b3d4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 6 Mar 2025 07:44:08 -0800 Subject: [PATCH 5/6] include error_message --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 173930644..988f4399f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -743,7 +743,7 @@ def _recv_sasl_authenticate(self): if error_type is not Errors.NoError: log.error("%s: SaslAuthenticate error: %s (%s)", self, error_type.__name__, response.error_message) - self.close(error=error_type()) + self.close(error=error_type(response.error_message)) return return response.auth_bytes else: From d52942361fed4d1c749d166ca964e4135551beed Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 7 Mar 2025 22:08:20 -0800 Subject: [PATCH 6/6] fixup oauth receive --- kafka/sasl/oauth.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka/sasl/oauth.py b/kafka/sasl/oauth.py index d4f8df983..7bbc7dd43 100644 --- a/kafka/sasl/oauth.py +++ b/kafka/sasl/oauth.py @@ -18,9 +18,8 @@ def auth_bytes(self): return "n,,\x01auth=Bearer {}{}\x01\x01".format(token, extensions).encode('utf-8') def receive(self, auth_bytes): - if auth_bytes != b'': - self._is_authenticated = False - self._is_done = True + self._is_done = True + self._is_authenticated = auth_bytes == b'' def is_done(self): return self._is_done