From b625692402858db6a97da2fd579643fda39608f0 Mon Sep 17 00:00:00 2001 From: Matt Oberle Date: Wed, 18 Aug 2021 13:38:37 -0400 Subject: [PATCH 1/7] Support AWS_MSK_IAM authentication Adds an AWS_MSK_IAM authentication mechanism which is described here: * https://github.com/aws/aws-msk-iam-auth#uriencode To use the mechanism pass the following keyword arguments when initializing a class: ``` security_protocol='SASL_SSL', sasl_mechanism='AWS_MSK_IAM', bootstrap_servers=[ 'b-1.cluster.x.y.kafka.region.amazonaws.com:9088', ... ], ``` The credentials and region will be pulled using `botocore.session.Session`. Using the mechanism requires the `botocore` library which can be installed with: ```sh pip install botocore ``` **TODO:** - [ ] Documentation - [ ] Tests - [ ] Refresh mechanism for temporary credentials? --- kafka/msk.py | 183 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 kafka/msk.py diff --git a/kafka/msk.py b/kafka/msk.py new file mode 100644 index 000000000..7c772b265 --- /dev/null +++ b/kafka/msk.py @@ -0,0 +1,183 @@ +import datetime +import hashlib +import hmac +import json +import string +import urllib.parse + + +class AwsMskIamClient: + UNRESERVED_CHARS = string.ascii_letters + string.digits + '-._~' + + def __init__(self, host, access_key, secret_key, region, token=None): + """ + Arguments: + host (str): The hostname of the broker. + access_key (str): An AWS_ACCESS_KEY_ID. + secret_key (str): An AWS_SECRET_ACCESS_KEY. + region (str): An AWS_REGION. + token (Optional[str]): An AWS_SESSION_TOKEN if using temporary + credentials. + """ + self.algorithm = 'AWS4-HMAC-SHA256' + self.expires = '900' + self.hashfunc = hashlib.sha256 + self.headers = [ + ('host', host) + ] + self.version = '2020_10_22' + + self.service = 'kafka-cluster' + self.action = '{}:Connect'.format(self.service) + + now = datetime.datetime.utcnow() + self.datestamp = now.strftime('%Y%m%d') + self.timestamp = now.strftime('%Y%m%dT%H%M%SZ') + + self.host = host + self.access_key = access_key + self.secret_key = secret_key + self.region = region + self.token = token + + @property + def _credential(self): + return '{0.access_key}/{0._scope}'.format(self) + + @property + def _scope(self): + return '{0.datestamp}/{0.region}/{0.service}/aws4_request'.format(self) + + @property + def _signed_headers(self): + """ + Returns (str): + An alphabetically sorted, semicolon-delimited list of lowercase + request header names. + """ + return ';'.join(sorted(k.lower() for k, _ in self.headers)) + + @property + def _canonical_headers(self): + """ + Returns (str): + A newline-delited list of header names and values. + Header names are lowercased. + """ + return '\n'.join(map(':'.join, self.headers)) + '\n' + + @property + def _canonical_request(self): + """ + Returns (str): + An AWS Signature Version 4 canonical request in the format: + \n + \n + \n + \n + \n + + """ + # The hashed_payload is always an empty string for MSK. + hashed_payload = self.hashfunc(b'').hexdigest() + return '\n'.join(( + 'GET', + '/', + self._canonical_querystring, + self._canonical_headers, + self._signed_headers, + hashed_payload, + )) + + @property + def _canonical_querystring(self): + """ + Returns (str): + A '&'-separated list of URI-encoded key/value pairs. + """ + params = [] + params.append(('Action', self.action)) + params.append(('X-Amz-Algorithm', self.algorithm)) + params.append(('X-Amz-Credential', self._credential)) + params.append(('X-Amz-Date', self.timestamp)) + params.append(('X-Amz-Expires', self.expires)) + if self.token: + params.append(('X-Amz-Security-Token', self.token)) + params.append(('X-Amz-SignedHeaders', self._signed_headers)) + + return '&'.join(self._uriencode(k) + '=' + self._uriencode(v) for k, v in params) + + @property + def _signing_key(self): + """ + Returns (bytes): + An AWS Signature V4 signing key generated from the secret_key, date, + region, service, and request type. + """ + key = self._hmac(('AWS4' + self.secret_key).encode('utf-8'), self.datestamp) + key = self._hmac(key, self.region) + key = self._hmac(key, self.service) + key = self._hmac(key, 'aws4_request') + return key + + @property + def _signing_str(self): + """ + Returns (str): + A string used to sign the AWS Signature V4 payload in the format: + \n + \n + \n + + """ + canonical_request_hash = self.hashfunc(self._canonical_request.encode('utf-8')).hexdigest() + return '\n'.join((self.algorithm, self.timestamp, self._scope, canonical_request_hash)) + + def _uriencode(self, msg): + """ + Arguments: + msg (str): A string to URI-encode. + + Returns (str): + The URI-encoded version of the provided msg, following the encoding + rules specified: https://github.com/aws/aws-msk-iam-auth#uriencode + """ + return urllib.parse.quote(msg, safe=self.UNRESERVED_CHARS) + + def _hmac(self, key, msg): + """ + Arguments: + key (bytes): A key to use for the HMAC digest. + msg (str): A value to include in the HMAC digest. + Returns (bytes): + An HMAC digest of the given key and msg. + """ + return hmac.new(key, msg.encode('utf-8'), digestmod=self.hashfunc).digest() + + def first_message(self): + """ + Returns (bytes): + An encoded JSON authentication payload that can be sent to the + broker. + """ + signature = hmac.new( + self._signing_key, + self._signing_str.encode('utf-8'), + digestmod=self.hashfunc, + ).hexdigest() + msg = { + 'version': self.version, + 'host': self.host, + 'user-agent': 'kafka-python', + 'action': self.action, + 'x-amz-algorithm': self.algorithm, + 'x-amz-credential': self._credential, + 'x-amz-date': self.timestamp, + 'x-amz-signedheaders': self._signed_headers, + 'x-amz-expires': self.expires, + 'x-amz-signature': signature, + } + if self.token: + msg['x-amz-security-token'] = self.token + + return json.dumps(msg, separators=(',', ':')).encode('utf-8') From d573206b2cbfc32ba38968f9898c2dd246eba560 Mon Sep 17 00:00:00 2001 From: Matt Oberle Date: Wed, 18 Aug 2021 18:09:11 -0400 Subject: [PATCH 2/7] Use six to import urllib (python2 compatability) --- kafka/msk.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/msk.py b/kafka/msk.py index 7c772b265..764b5d0dc 100644 --- a/kafka/msk.py +++ b/kafka/msk.py @@ -3,7 +3,8 @@ import hmac import json import string -import urllib.parse + +from kafka.vendor.six.moves import urllib class AwsMskIamClient: From 14dbc925de85f4e3758b1b9e8e9abf3efea1b6a8 Mon Sep 17 00:00:00 2001 From: Matt Oberle Date: Wed, 18 Aug 2021 18:10:35 -0400 Subject: [PATCH 3/7] Tests AWS_MSK_IAM signature and payload generation The two tests in `test/test_msk.py` should ensure that the changes to `kafka/msk.py` do not break the authentication payload. The authentication payload was validated using a real AWS Kafka cluster before adding tests with the hard-coded signatures. --- test/test_msk.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 test/test_msk.py diff --git a/test/test_msk.py b/test/test_msk.py new file mode 100644 index 000000000..d4620e39e --- /dev/null +++ b/test/test_msk.py @@ -0,0 +1,67 @@ +import datetime +import json + +from kafka.msk import AwsMskIamClient + +try: + from unittest import mock +except ImportError: + import mock + + +def client_factory(token=None): + now = datetime.datetime.utcfromtimestamp(1629321911) + with mock.patch('kafka.msk.datetime') as mock_dt: + mock_dt.datetime.utcnow = mock.Mock(return_value=now) + return AwsMskIamClient( + host='localhost', + access_key='XXXXXXXXXXXXXXXXXXXX', + secret_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', + region='us-east-1', + token=token, + ) + + +def test_aws_msk_iam_client_permanent_credentials(): + client = client_factory(token=None) + msg = client.first_message() + assert msg + assert isinstance(msg, bytes) + actual = json.loads(msg) + + expected = { + 'version': '2020_10_22', + 'host': 'localhost', + 'user-agent': 'kafka-python', + 'action': 'kafka-cluster:Connect', + 'x-amz-algorithm': 'AWS4-HMAC-SHA256', + 'x-amz-credential': 'XXXXXXXXXXXXXXXXXXXX/20210818/us-east-1/kafka-cluster/aws4_request', + 'x-amz-date': '20210818T212511Z', + 'x-amz-signedheaders': 'host', + 'x-amz-expires': '900', + 'x-amz-signature': '0fa42ae3d5693777942a7a4028b564f0b372bafa2f71c1a19ad60680e6cb994b', + } + assert actual == expected + + +def test_aws_msk_iam_client_temporary_credentials(): + client = client_factory(token='XXXXX') + msg = client.first_message() + assert msg + assert isinstance(msg, bytes) + actual = json.loads(msg) + + expected = { + 'version': '2020_10_22', + 'host': 'localhost', + 'user-agent': 'kafka-python', + 'action': 'kafka-cluster:Connect', + 'x-amz-algorithm': 'AWS4-HMAC-SHA256', + 'x-amz-credential': 'XXXXXXXXXXXXXXXXXXXX/20210818/us-east-1/kafka-cluster/aws4_request', + 'x-amz-date': '20210818T212511Z', + 'x-amz-signedheaders': 'host', + 'x-amz-expires': '900', + 'x-amz-signature': 'b0619c50b7ecb4a7f6f92bd5f733770df5710e97b25146f97015c0b1db783b05', + 'x-amz-security-token': 'XXXXX', + } + assert actual == expected From 04a0087e1ba0f6d54fb8c44af1a27125598bf8ab Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 11 Mar 2025 12:24:49 -0700 Subject: [PATCH 4/7] Convert msk to sasl module --- kafka/sasl/__init__.py | 2 ++ kafka/{ => sasl}/msk.py | 45 +++++++++++++++++++++++++++++++++++++ test/{ => sasl}/test_msk.py | 4 ++-- 3 files changed, 49 insertions(+), 2 deletions(-) rename kafka/{ => sasl}/msk.py (80%) rename test/{ => sasl}/test_msk.py (95%) diff --git a/kafka/sasl/__init__.py b/kafka/sasl/__init__.py index e36d1dfbd..8677f60d2 100644 --- a/kafka/sasl/__init__.py +++ b/kafka/sasl/__init__.py @@ -1,6 +1,7 @@ from __future__ import absolute_import from kafka.sasl.gssapi import SaslMechanismGSSAPI +from kafka.sasl.msk import SaslMechanismAwsMskIam from kafka.sasl.oauth import SaslMechanismOAuth from kafka.sasl.plain import SaslMechanismPlain from kafka.sasl.scram import SaslMechanismScram @@ -24,3 +25,4 @@ def get_sasl_mechanism(name): register_sasl_mechanism('PLAIN', SaslMechanismPlain) register_sasl_mechanism('SCRAM-SHA-256', SaslMechanismScram) register_sasl_mechanism('SCRAM-SHA-512', SaslMechanismScram) +register_sasl_mechanism('AWS_MSK_IAM', SaslMechanismAwsMskIam) diff --git a/kafka/msk.py b/kafka/sasl/msk.py similarity index 80% rename from kafka/msk.py rename to kafka/sasl/msk.py index 764b5d0dc..5de49d002 100644 --- a/kafka/msk.py +++ b/kafka/sasl/msk.py @@ -1,12 +1,57 @@ +from __future__ import absolute_import + import datetime import hashlib import hmac import json +import logging import string +# needed for AWS_MSK_IAM authentication: +try: + from botocore.session import Session as BotoSession +except ImportError: + # no botocore available, will disable AWS_MSK_IAM mechanism + BotoSession = None + +from kafka.sasl.abc import SaslMechanism from kafka.vendor.six.moves import urllib +log = logging.getLogger(__name__) + + +class SaslMechanismAwsMskIam(SaslMechanism): + def __init__(self, **config): + assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package' + assert config['security_protocol'] == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL' + self._is_done = False + self._is_authenticated = False + + def auth_bytes(self): + session = BotoSession() + credentials = session.get_credentials().get_frozen_credentials() + client = AwsMskIamClient( + host=self.host, + access_key=credentials.access_key, + secret_key=credentials.secret_key, + region=session.get_config_variable('region'), + token=credentials.token, + ) + return client.first_message() + + def receive(self, auth_bytes): + self._is_done = True + self._is_authenticated = auth_bytes != b'' + log.info('Authenticated via AWS_MSK_IAM %s', auth_bytes.decode('utf-8')) + + def is_done(self): + return self._is_done + + def is_authenticated(self): + return self._is_authenticated + + class AwsMskIamClient: UNRESERVED_CHARS = string.ascii_letters + string.digits + '-._~' diff --git a/test/test_msk.py b/test/sasl/test_msk.py similarity index 95% rename from test/test_msk.py rename to test/sasl/test_msk.py index d4620e39e..297ca84ce 100644 --- a/test/test_msk.py +++ b/test/sasl/test_msk.py @@ -1,7 +1,7 @@ import datetime import json -from kafka.msk import AwsMskIamClient +from kafka.sasl.msk import AwsMskIamClient try: from unittest import mock @@ -11,7 +11,7 @@ def client_factory(token=None): now = datetime.datetime.utcfromtimestamp(1629321911) - with mock.patch('kafka.msk.datetime') as mock_dt: + with mock.patch('kafka.sasl.msk.datetime') as mock_dt: mock_dt.datetime.utcnow = mock.Mock(return_value=now) return AwsMskIamClient( host='localhost', From 875b42041dec9ec5a0e91a2a1bd9b982e5eba5b1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 11 Mar 2025 14:46:01 -0700 Subject: [PATCH 5/7] avoid KeyError in config validation --- kafka/sasl/msk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/sasl/msk.py b/kafka/sasl/msk.py index 5de49d002..c1af6ee31 100644 --- a/kafka/sasl/msk.py +++ b/kafka/sasl/msk.py @@ -24,7 +24,7 @@ class SaslMechanismAwsMskIam(SaslMechanism): def __init__(self, **config): assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package' - assert config['security_protocol'] == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL' + assert config.get('security_protocol', '') == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL' self._is_done = False self._is_authenticated = False From 1484074e947b055c95381d509825731e9e6b5e86 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 11 Mar 2025 15:30:04 -0700 Subject: [PATCH 6/7] prefer auth_details() over log --- kafka/sasl/msk.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka/sasl/msk.py b/kafka/sasl/msk.py index c1af6ee31..24c53b496 100644 --- a/kafka/sasl/msk.py +++ b/kafka/sasl/msk.py @@ -4,7 +4,6 @@ import hashlib import hmac import json -import logging import string # needed for AWS_MSK_IAM authentication: @@ -18,9 +17,6 @@ from kafka.vendor.six.moves import urllib -log = logging.getLogger(__name__) - - class SaslMechanismAwsMskIam(SaslMechanism): def __init__(self, **config): assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package' @@ -43,7 +39,6 @@ def auth_bytes(self): def receive(self, auth_bytes): self._is_done = True self._is_authenticated = auth_bytes != b'' - log.info('Authenticated via AWS_MSK_IAM %s', auth_bytes.decode('utf-8')) def is_done(self): return self._is_done @@ -51,6 +46,11 @@ def is_done(self): def is_authenticated(self): return self._is_authenticated + def auth_details(self): + if not self.is_authenticated: + raise RuntimeError('Not authenticated yet!') + return 'Authenticated via SASL / AWS_MSK_IAM %s' % (auth_bytes.decode('utf-8'),) + class AwsMskIamClient: UNRESERVED_CHARS = string.ascii_letters + string.digits + '-._~' From 5baed8a9ef5c84baa1b2001a90c72bf51edbe857 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 11 Mar 2025 16:23:21 -0700 Subject: [PATCH 7/7] fixups --- kafka/sasl/msk.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kafka/sasl/msk.py b/kafka/sasl/msk.py index 24c53b496..db56b4801 100644 --- a/kafka/sasl/msk.py +++ b/kafka/sasl/msk.py @@ -21,6 +21,9 @@ class SaslMechanismAwsMskIam(SaslMechanism): def __init__(self, **config): assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package' assert config.get('security_protocol', '') == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL' + assert 'host' in config, 'AWS_MSK_IAM requires host configuration' + self.host = config['host'] + self._auth = None self._is_done = False self._is_authenticated = False @@ -39,6 +42,7 @@ def auth_bytes(self): def receive(self, auth_bytes): self._is_done = True self._is_authenticated = auth_bytes != b'' + self._auth = auth_bytes.deode('utf-8') def is_done(self): return self._is_done @@ -49,7 +53,7 @@ def is_authenticated(self): def auth_details(self): if not self.is_authenticated: raise RuntimeError('Not authenticated yet!') - return 'Authenticated via SASL / AWS_MSK_IAM %s' % (auth_bytes.decode('utf-8'),) + return 'Authenticated via SASL / AWS_MSK_IAM %s' % (self._auth,) class AwsMskIamClient: