Skip to content

Commit 08f6273

Browse files
committed
Convert msk to sasl module
1 parent 220c09f commit 08f6273

File tree

3 files changed

+49
-2
lines changed

3 files changed

+49
-2
lines changed

Diff for: kafka/sasl/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.sasl.gssapi import SaslMechanismGSSAPI
4+
from kafka.sasl.msk import SaslMechanismAwsMskIam
45
from kafka.sasl.oauth import SaslMechanismOAuth
56
from kafka.sasl.plain import SaslMechanismPlain
67
from kafka.sasl.scram import SaslMechanismScram
@@ -24,3 +25,4 @@ def get_sasl_mechanism(name):
2425
register_sasl_mechanism('PLAIN', SaslMechanismPlain)
2526
register_sasl_mechanism('SCRAM-SHA-256', SaslMechanismScram)
2627
register_sasl_mechanism('SCRAM-SHA-512', SaslMechanismScram)
28+
register_sasl_mechanism('AWS_MSK_IAM', SaslMechanismAwsMskIam)

Diff for: kafka/msk.py renamed to kafka/sasl/msk.py

+45
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,57 @@
1+
from __future__ import absolute_import
2+
13
import datetime
24
import hashlib
35
import hmac
46
import json
7+
import logging
58
import string
69

10+
# needed for AWS_MSK_IAM authentication:
11+
try:
12+
from botocore.session import Session as BotoSession
13+
except ImportError:
14+
# no botocore available, will disable AWS_MSK_IAM mechanism
15+
BotoSession = None
16+
17+
from kafka.sasl.abc import SaslMechanism
718
from kafka.vendor.six.moves import urllib
819

920

21+
log = logging.getLogger(__name__)
22+
23+
24+
class SaslMechanismAwsMskIam(SaslMechanism):
25+
def __init__(self, **config):
26+
assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package'
27+
assert config['security_protocol'] == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL'
28+
self._is_done = False
29+
self._is_authenticated = False
30+
31+
def auth_bytes(self):
32+
session = BotoSession()
33+
credentials = session.get_credentials().get_frozen_credentials()
34+
client = AwsMskIamClient(
35+
host=self.host,
36+
access_key=credentials.access_key,
37+
secret_key=credentials.secret_key,
38+
region=session.get_config_variable('region'),
39+
token=credentials.token,
40+
)
41+
return client.first_message()
42+
43+
def receive(self, auth_bytes):
44+
self._is_done = True
45+
self._is_authenticated = auth_bytes != b''
46+
log.info('Authenticated via AWS_MSK_IAM %s', auth_bytes.decode('utf-8'))
47+
48+
def is_done(self):
49+
return self._is_done
50+
51+
def is_authenticated(self):
52+
return self._is_authenticated
53+
54+
1055
class AwsMskIamClient:
1156
UNRESERVED_CHARS = string.ascii_letters + string.digits + '-._~'
1257

Diff for: test/test_msk.py renamed to test/sasl/test_msk.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import datetime
22
import json
33

4-
from kafka.msk import AwsMskIamClient
4+
from kafka.sasl.msk import AwsMskIamClient
55

66
try:
77
from unittest import mock
@@ -11,7 +11,7 @@
1111

1212
def client_factory(token=None):
1313
now = datetime.datetime.utcfromtimestamp(1629321911)
14-
with mock.patch('kafka.msk.datetime') as mock_dt:
14+
with mock.patch('kafka.sasl.msk.datetime') as mock_dt:
1515
mock_dt.datetime.utcnow = mock.Mock(return_value=now)
1616
return AwsMskIamClient(
1717
host='localhost',

0 commit comments

Comments
 (0)