Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Sasl authentication with SaslMechanism abstract base class; support SaslAuthenticate #2515

Merged
merged 6 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
337 changes: 80 additions & 257 deletions kafka/conn.py

Large diffs are not rendered by default.

35 changes: 0 additions & 35 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,41 +346,6 @@ class DescribeGroupsRequest_v3(Request):
]


class SaslHandShakeResponse_v0(Response):
API_KEY = 17
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('enabled_mechanisms', Array(String('utf-8')))
)


class SaslHandShakeResponse_v1(Response):
API_KEY = 17
API_VERSION = 1
SCHEMA = SaslHandShakeResponse_v0.SCHEMA


class SaslHandShakeRequest_v0(Request):
API_KEY = 17
API_VERSION = 0
RESPONSE_TYPE = SaslHandShakeResponse_v0
SCHEMA = Schema(
('mechanism', String('utf-8'))
)


class SaslHandShakeRequest_v1(Request):
API_KEY = 17
API_VERSION = 1
RESPONSE_TYPE = SaslHandShakeResponse_v1
SCHEMA = SaslHandShakeRequest_v0.SCHEMA


SaslHandShakeRequest = [SaslHandShakeRequest_v0, SaslHandShakeRequest_v1]
SaslHandShakeResponse = [SaslHandShakeResponse_v0, SaslHandShakeResponse_v1]


class DescribeAclsResponse_v0(Response):
API_KEY = 29
API_VERSION = 0
Expand Down
42 changes: 42 additions & 0 deletions kafka/protocol/sasl_authenticate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import absolute_import

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Bytes, Int16, Int64, Schema, String


class SaslAuthenticateResponse_v0(Response):
API_KEY = 36
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('error_message', String('utf-8')),
('auth_bytes', Bytes))


class SaslAuthenticateResponse_v1(Response):
API_KEY = 36
API_VERSION = 1
SCHEMA = Schema(
('error_code', Int16),
('error_message', String('utf-8')),
('auth_bytes', Bytes),
('session_lifetime_ms', Int64))


class SaslAuthenticateRequest_v0(Request):
API_KEY = 36
API_VERSION = 0
RESPONSE_TYPE = SaslAuthenticateResponse_v0
SCHEMA = Schema(
('auth_bytes', Bytes))


class SaslAuthenticateRequest_v1(Request):
API_KEY = 36
API_VERSION = 1
RESPONSE_TYPE = SaslAuthenticateResponse_v1
SCHEMA = SaslAuthenticateRequest_v0.SCHEMA


SaslAuthenticateRequest = [SaslAuthenticateRequest_v0, SaslAuthenticateRequest_v1]
SaslAuthenticateResponse = [SaslAuthenticateResponse_v0, SaslAuthenticateResponse_v1]
39 changes: 39 additions & 0 deletions kafka/protocol/sasl_handshake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import absolute_import

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Int16, Schema, String


class SaslHandshakeResponse_v0(Response):
API_KEY = 17
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('enabled_mechanisms', Array(String('utf-8')))
)


class SaslHandshakeResponse_v1(Response):
API_KEY = 17
API_VERSION = 1
SCHEMA = SaslHandshakeResponse_v0.SCHEMA


class SaslHandshakeRequest_v0(Request):
API_KEY = 17
API_VERSION = 0
RESPONSE_TYPE = SaslHandshakeResponse_v0
SCHEMA = Schema(
('mechanism', String('utf-8'))
)


class SaslHandshakeRequest_v1(Request):
API_KEY = 17
API_VERSION = 1
RESPONSE_TYPE = SaslHandshakeResponse_v1
SCHEMA = SaslHandshakeRequest_v0.SCHEMA


SaslHandshakeRequest = [SaslHandshakeRequest_v0, SaslHandshakeRequest_v1]
SaslHandshakeResponse = [SaslHandshakeResponse_v0, SaslHandshakeResponse_v1]
26 changes: 26 additions & 0 deletions kafka/sasl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import absolute_import

from kafka.sasl.gssapi import SaslMechanismGSSAPI
from kafka.sasl.oauth import SaslMechanismOAuth
from kafka.sasl.plain import SaslMechanismPlain
from kafka.sasl.scram import SaslMechanismScram


SASL_MECHANISMS = {}


def register_sasl_mechanism(name, klass, overwrite=False):
if not overwrite and name in SASL_MECHANISMS:
raise ValueError('Sasl mechanism %s already defined!' % name)
SASL_MECHANISMS[name] = klass


def get_sasl_mechanism(name):
return SASL_MECHANISMS[name]


register_sasl_mechanism('GSSAPI', SaslMechanismGSSAPI)
register_sasl_mechanism('OAUTHBEARER', SaslMechanismOAuth)
register_sasl_mechanism('PLAIN', SaslMechanismPlain)
register_sasl_mechanism('SCRAM-SHA-256', SaslMechanismScram)
register_sasl_mechanism('SCRAM-SHA-512', SaslMechanismScram)
27 changes: 27 additions & 0 deletions kafka/sasl/abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import absolute_import

import abc


class SaslMechanism(object):
__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def __init__(self, **config):
pass

@abc.abstractmethod
def auth_bytes(self):
pass

@abc.abstractmethod
def receive(self, auth_bytes):
pass

@abc.abstractmethod
def is_done(self):
pass

@abc.abstractmethod
def is_authenticated(self):
pass
73 changes: 73 additions & 0 deletions kafka/sasl/gssapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from __future__ import absolute_import

# needed for SASL_GSSAPI authentication:
try:
import gssapi
from gssapi.raw.misc import GSSError
except (ImportError, OSError):
#no gssapi available, will disable gssapi mechanism
gssapi = None
GSSError = None

from kafka.sasl.abc import SaslMechanism


class SaslMechanismGSSAPI(SaslMechanism):
# Establish security context and negotiate protection level
# For reference RFC 2222, section 7.2.1

SASL_QOP_AUTH = 1
SASL_QOP_AUTH_INT = 2
SASL_QOP_AUTH_CONF = 4

def __init__(self, **config):
assert gssapi is not None, 'GSSAPI lib not available'
assert config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
self._is_done = False
self._is_authenticated = False
self.kerberos_damin_name = config['sasl_kerberos_domain_name'] or config['host']
self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
self.gssapi_name = gssapi.Name(auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos)
self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate')
self._next_token = self._client_ctx.step(None)

def auth_bytes(self):
# GSSAPI Auth does not have a final broker->client message
# so mark is_done after the final auth_bytes are provided
# in practice we'll still receive a response when using SaslAuthenticate
# but not when using the prior unframed approach.
if self._client_ctx.complete:
self._is_done = True
self._is_authenticated = True
return self._next_token or b''

def receive(self, auth_bytes):
if not self._client_ctx.complete:
# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
self._next_token = self._client_ctx.step(auth_bytes)
elif self._is_done:
# The final step of gssapi is send, so we do not expect any additional bytes
# however, allow an empty message to support SaslAuthenticate response
if auth_bytes != b'':
raise ValueError("Unexpected receive auth_bytes after sasl/gssapi completion")
else:
# unwraps message containing supported protection levels and msg size
msg = client_ctx.unwrap(received_token).message
# Kafka currently doesn't support integrity or confidentiality security layers, so we
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
# by the server
message_parts = [
Int8.encode(self.SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))),
msg[:1],
self.auth_id.encode(),
]
# add authorization identity to the response, and GSS-wrap
self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message

def is_done(self):
return self._is_done

def is_authenticated(self):
return self._is_authenticated
39 changes: 39 additions & 0 deletions kafka/sasl/oauth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import absolute_import

from kafka.sasl.abc import SaslMechanism


class SaslMechanismOAuth(SaslMechanism):

def __init__(self, **config):
self.token_provider = config['sasl_oauth_token_provider']
assert self.token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
assert callable(getattr(self.token_provider, 'token', None)), 'sasl_oauth_token_provider must implement method #token()'
self._is_done = False
self._is_authenticated = False

def auth_bytes(self):
token = self.token_provider.token()
extensions = self._token_extensions()
return "n,,\x01auth=Bearer {}{}\x01\x01".format(token, extensions).encode('utf-8')

def receive(self, auth_bytes):
self._is_done = True
self._is_authenticated = auth_bytes == b''

def is_done(self):
return self._is_done

def is_authenticated(self):
return self._is_authenticated

def _token_extensions(self):
"""
Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER
initial request.
"""
# Only run if the #extensions() method is implemented by the clients Token Provider class
# Builds up a string separated by \x01 via a dict of key value pairs
extensions = getattr(self.token_provider, 'extensions', lambda: [])()
msg = '\x01'.join(['{}={}'.format(k, v) for k, v in extensions.items()])
return '\x01' + msg if msg else ''
36 changes: 36 additions & 0 deletions kafka/sasl/plain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import absolute_import

import logging

from kafka.sasl.abc import SaslMechanism


log = logging.getLogger(__name__)


class SaslMechanismPlain(SaslMechanism):

def __init__(self, **config):
if config['security_protocol'] == 'SASL_PLAINTEXT':
log.warning('Sending username and password in the clear')
assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'

self.username = config['sasl_plain_username']
self.password = config['sasl_plain_password']
self._is_done = False
self._is_authenticated = False

def auth_bytes(self):
# Send PLAIN credentials per RFC-4616
return bytes('\0'.join([self.username, self.username, self.password]).encode('utf-8'))

def receive(self, auth_bytes):
self._is_done = True
self._is_authenticated = auth_bytes == b''

def is_done(self):
return self._is_done

def is_authenticated(self):
return self._is_authenticated
Loading