Skip to content

Commit 8320cc8

Browse files
authored
Refactor Sasl authentication with SaslMechanism abstract base class; support SaslAuthenticate (#2515)
1 parent f046b0c commit 8320cc8

File tree

10 files changed

+422
-307
lines changed

10 files changed

+422
-307
lines changed

kafka/conn.py

+80-257
Large diffs are not rendered by default.

kafka/protocol/admin.py

-35
Original file line numberDiff line numberDiff line change
@@ -346,41 +346,6 @@ class DescribeGroupsRequest_v3(Request):
346346
]
347347

348348

349-
class SaslHandShakeResponse_v0(Response):
350-
API_KEY = 17
351-
API_VERSION = 0
352-
SCHEMA = Schema(
353-
('error_code', Int16),
354-
('enabled_mechanisms', Array(String('utf-8')))
355-
)
356-
357-
358-
class SaslHandShakeResponse_v1(Response):
359-
API_KEY = 17
360-
API_VERSION = 1
361-
SCHEMA = SaslHandShakeResponse_v0.SCHEMA
362-
363-
364-
class SaslHandShakeRequest_v0(Request):
365-
API_KEY = 17
366-
API_VERSION = 0
367-
RESPONSE_TYPE = SaslHandShakeResponse_v0
368-
SCHEMA = Schema(
369-
('mechanism', String('utf-8'))
370-
)
371-
372-
373-
class SaslHandShakeRequest_v1(Request):
374-
API_KEY = 17
375-
API_VERSION = 1
376-
RESPONSE_TYPE = SaslHandShakeResponse_v1
377-
SCHEMA = SaslHandShakeRequest_v0.SCHEMA
378-
379-
380-
SaslHandShakeRequest = [SaslHandShakeRequest_v0, SaslHandShakeRequest_v1]
381-
SaslHandShakeResponse = [SaslHandShakeResponse_v0, SaslHandShakeResponse_v1]
382-
383-
384349
class DescribeAclsResponse_v0(Response):
385350
API_KEY = 29
386351
API_VERSION = 0

kafka/protocol/sasl_authenticate.py

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.protocol.api import Request, Response
4+
from kafka.protocol.types import Array, Bytes, Int16, Int64, Schema, String
5+
6+
7+
class SaslAuthenticateResponse_v0(Response):
8+
API_KEY = 36
9+
API_VERSION = 0
10+
SCHEMA = Schema(
11+
('error_code', Int16),
12+
('error_message', String('utf-8')),
13+
('auth_bytes', Bytes))
14+
15+
16+
class SaslAuthenticateResponse_v1(Response):
17+
API_KEY = 36
18+
API_VERSION = 1
19+
SCHEMA = Schema(
20+
('error_code', Int16),
21+
('error_message', String('utf-8')),
22+
('auth_bytes', Bytes),
23+
('session_lifetime_ms', Int64))
24+
25+
26+
class SaslAuthenticateRequest_v0(Request):
27+
API_KEY = 36
28+
API_VERSION = 0
29+
RESPONSE_TYPE = SaslAuthenticateResponse_v0
30+
SCHEMA = Schema(
31+
('auth_bytes', Bytes))
32+
33+
34+
class SaslAuthenticateRequest_v1(Request):
35+
API_KEY = 36
36+
API_VERSION = 1
37+
RESPONSE_TYPE = SaslAuthenticateResponse_v1
38+
SCHEMA = SaslAuthenticateRequest_v0.SCHEMA
39+
40+
41+
SaslAuthenticateRequest = [SaslAuthenticateRequest_v0, SaslAuthenticateRequest_v1]
42+
SaslAuthenticateResponse = [SaslAuthenticateResponse_v0, SaslAuthenticateResponse_v1]

kafka/protocol/sasl_handshake.py

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.protocol.api import Request, Response
4+
from kafka.protocol.types import Array, Int16, Schema, String
5+
6+
7+
class SaslHandshakeResponse_v0(Response):
8+
API_KEY = 17
9+
API_VERSION = 0
10+
SCHEMA = Schema(
11+
('error_code', Int16),
12+
('enabled_mechanisms', Array(String('utf-8')))
13+
)
14+
15+
16+
class SaslHandshakeResponse_v1(Response):
17+
API_KEY = 17
18+
API_VERSION = 1
19+
SCHEMA = SaslHandshakeResponse_v0.SCHEMA
20+
21+
22+
class SaslHandshakeRequest_v0(Request):
23+
API_KEY = 17
24+
API_VERSION = 0
25+
RESPONSE_TYPE = SaslHandshakeResponse_v0
26+
SCHEMA = Schema(
27+
('mechanism', String('utf-8'))
28+
)
29+
30+
31+
class SaslHandshakeRequest_v1(Request):
32+
API_KEY = 17
33+
API_VERSION = 1
34+
RESPONSE_TYPE = SaslHandshakeResponse_v1
35+
SCHEMA = SaslHandshakeRequest_v0.SCHEMA
36+
37+
38+
SaslHandshakeRequest = [SaslHandshakeRequest_v0, SaslHandshakeRequest_v1]
39+
SaslHandshakeResponse = [SaslHandshakeResponse_v0, SaslHandshakeResponse_v1]

kafka/sasl/__init__.py

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.sasl.gssapi import SaslMechanismGSSAPI
4+
from kafka.sasl.oauth import SaslMechanismOAuth
5+
from kafka.sasl.plain import SaslMechanismPlain
6+
from kafka.sasl.scram import SaslMechanismScram
7+
8+
9+
SASL_MECHANISMS = {}
10+
11+
12+
def register_sasl_mechanism(name, klass, overwrite=False):
13+
if not overwrite and name in SASL_MECHANISMS:
14+
raise ValueError('Sasl mechanism %s already defined!' % name)
15+
SASL_MECHANISMS[name] = klass
16+
17+
18+
def get_sasl_mechanism(name):
19+
return SASL_MECHANISMS[name]
20+
21+
22+
register_sasl_mechanism('GSSAPI', SaslMechanismGSSAPI)
23+
register_sasl_mechanism('OAUTHBEARER', SaslMechanismOAuth)
24+
register_sasl_mechanism('PLAIN', SaslMechanismPlain)
25+
register_sasl_mechanism('SCRAM-SHA-256', SaslMechanismScram)
26+
register_sasl_mechanism('SCRAM-SHA-512', SaslMechanismScram)

kafka/sasl/abc.py

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from __future__ import absolute_import
2+
3+
import abc
4+
5+
6+
class SaslMechanism(object):
7+
__metaclass__ = abc.ABCMeta
8+
9+
@abc.abstractmethod
10+
def __init__(self, **config):
11+
pass
12+
13+
@abc.abstractmethod
14+
def auth_bytes(self):
15+
pass
16+
17+
@abc.abstractmethod
18+
def receive(self, auth_bytes):
19+
pass
20+
21+
@abc.abstractmethod
22+
def is_done(self):
23+
pass
24+
25+
@abc.abstractmethod
26+
def is_authenticated(self):
27+
pass

kafka/sasl/gssapi.py

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from __future__ import absolute_import
2+
3+
# needed for SASL_GSSAPI authentication:
4+
try:
5+
import gssapi
6+
from gssapi.raw.misc import GSSError
7+
except (ImportError, OSError):
8+
#no gssapi available, will disable gssapi mechanism
9+
gssapi = None
10+
GSSError = None
11+
12+
from kafka.sasl.abc import SaslMechanism
13+
14+
15+
class SaslMechanismGSSAPI(SaslMechanism):
16+
# Establish security context and negotiate protection level
17+
# For reference RFC 2222, section 7.2.1
18+
19+
SASL_QOP_AUTH = 1
20+
SASL_QOP_AUTH_INT = 2
21+
SASL_QOP_AUTH_CONF = 4
22+
23+
def __init__(self, **config):
24+
assert gssapi is not None, 'GSSAPI lib not available'
25+
assert config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
26+
self._is_done = False
27+
self._is_authenticated = False
28+
self.kerberos_damin_name = config['sasl_kerberos_domain_name'] or config['host']
29+
self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
30+
self.gssapi_name = gssapi.Name(auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos)
31+
self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate')
32+
self._next_token = self._client_ctx.step(None)
33+
34+
def auth_bytes(self):
35+
# GSSAPI Auth does not have a final broker->client message
36+
# so mark is_done after the final auth_bytes are provided
37+
# in practice we'll still receive a response when using SaslAuthenticate
38+
# but not when using the prior unframed approach.
39+
if self._client_ctx.complete:
40+
self._is_done = True
41+
self._is_authenticated = True
42+
return self._next_token or b''
43+
44+
def receive(self, auth_bytes):
45+
if not self._client_ctx.complete:
46+
# The server will send a token back. Processing of this token either
47+
# establishes a security context, or it needs further token exchange.
48+
# The gssapi will be able to identify the needed next step.
49+
self._next_token = self._client_ctx.step(auth_bytes)
50+
elif self._is_done:
51+
# The final step of gssapi is send, so we do not expect any additional bytes
52+
# however, allow an empty message to support SaslAuthenticate response
53+
if auth_bytes != b'':
54+
raise ValueError("Unexpected receive auth_bytes after sasl/gssapi completion")
55+
else:
56+
# unwraps message containing supported protection levels and msg size
57+
msg = client_ctx.unwrap(received_token).message
58+
# Kafka currently doesn't support integrity or confidentiality security layers, so we
59+
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
60+
# by the server
61+
message_parts = [
62+
Int8.encode(self.SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))),
63+
msg[:1],
64+
self.auth_id.encode(),
65+
]
66+
# add authorization identity to the response, and GSS-wrap
67+
self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message
68+
69+
def is_done(self):
70+
return self._is_done
71+
72+
def is_authenticated(self):
73+
return self._is_authenticated

kafka/sasl/oauth.py

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.sasl.abc import SaslMechanism
4+
5+
6+
class SaslMechanismOAuth(SaslMechanism):
7+
8+
def __init__(self, **config):
9+
self.token_provider = config['sasl_oauth_token_provider']
10+
assert self.token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
11+
assert callable(getattr(self.token_provider, 'token', None)), 'sasl_oauth_token_provider must implement method #token()'
12+
self._is_done = False
13+
self._is_authenticated = False
14+
15+
def auth_bytes(self):
16+
token = self.token_provider.token()
17+
extensions = self._token_extensions()
18+
return "n,,\x01auth=Bearer {}{}\x01\x01".format(token, extensions).encode('utf-8')
19+
20+
def receive(self, auth_bytes):
21+
self._is_done = True
22+
self._is_authenticated = auth_bytes == b''
23+
24+
def is_done(self):
25+
return self._is_done
26+
27+
def is_authenticated(self):
28+
return self._is_authenticated
29+
30+
def _token_extensions(self):
31+
"""
32+
Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER
33+
initial request.
34+
"""
35+
# Only run if the #extensions() method is implemented by the clients Token Provider class
36+
# Builds up a string separated by \x01 via a dict of key value pairs
37+
extensions = getattr(self.token_provider, 'extensions', lambda: [])()
38+
msg = '\x01'.join(['{}={}'.format(k, v) for k, v in extensions.items()])
39+
return '\x01' + msg if msg else ''

kafka/sasl/plain.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from __future__ import absolute_import
2+
3+
import logging
4+
5+
from kafka.sasl.abc import SaslMechanism
6+
7+
8+
log = logging.getLogger(__name__)
9+
10+
11+
class SaslMechanismPlain(SaslMechanism):
12+
13+
def __init__(self, **config):
14+
if config['security_protocol'] == 'SASL_PLAINTEXT':
15+
log.warning('Sending username and password in the clear')
16+
assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
17+
assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
18+
19+
self.username = config['sasl_plain_username']
20+
self.password = config['sasl_plain_password']
21+
self._is_done = False
22+
self._is_authenticated = False
23+
24+
def auth_bytes(self):
25+
# Send PLAIN credentials per RFC-4616
26+
return bytes('\0'.join([self.username, self.username, self.password]).encode('utf-8'))
27+
28+
def receive(self, auth_bytes):
29+
self._is_done = True
30+
self._is_authenticated = auth_bytes == b''
31+
32+
def is_done(self):
33+
return self._is_done
34+
35+
def is_authenticated(self):
36+
return self._is_authenticated

0 commit comments

Comments
 (0)