Skip to content

Commit 02dd98f

Browse files
dpkpmattoberle
andauthored
Support AWS_MSK_IAM authentication (#2519)
Co-authored-by: Matt Oberle <[email protected]>
1 parent 0ae708a commit 02dd98f

File tree

3 files changed

+302
-0
lines changed

3 files changed

+302
-0
lines changed

Diff for: kafka/sasl/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.sasl.gssapi import SaslMechanismGSSAPI
4+
from kafka.sasl.msk import SaslMechanismAwsMskIam
45
from kafka.sasl.oauth import SaslMechanismOAuth
56
from kafka.sasl.plain import SaslMechanismPlain
67
from kafka.sasl.scram import SaslMechanismScram
@@ -24,3 +25,4 @@ def get_sasl_mechanism(name):
2425
register_sasl_mechanism('PLAIN', SaslMechanismPlain)
2526
register_sasl_mechanism('SCRAM-SHA-256', SaslMechanismScram)
2627
register_sasl_mechanism('SCRAM-SHA-512', SaslMechanismScram)
28+
register_sasl_mechanism('AWS_MSK_IAM', SaslMechanismAwsMskIam)

Diff for: kafka/sasl/msk.py

+233
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
from __future__ import absolute_import
2+
3+
import datetime
4+
import hashlib
5+
import hmac
6+
import json
7+
import string
8+
9+
# needed for AWS_MSK_IAM authentication:
10+
try:
11+
from botocore.session import Session as BotoSession
12+
except ImportError:
13+
# no botocore available, will disable AWS_MSK_IAM mechanism
14+
BotoSession = None
15+
16+
from kafka.sasl.abc import SaslMechanism
17+
from kafka.vendor.six.moves import urllib
18+
19+
20+
class SaslMechanismAwsMskIam(SaslMechanism):
21+
def __init__(self, **config):
22+
assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package'
23+
assert config.get('security_protocol', '') == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL'
24+
assert 'host' in config, 'AWS_MSK_IAM requires host configuration'
25+
self.host = config['host']
26+
self._auth = None
27+
self._is_done = False
28+
self._is_authenticated = False
29+
30+
def auth_bytes(self):
31+
session = BotoSession()
32+
credentials = session.get_credentials().get_frozen_credentials()
33+
client = AwsMskIamClient(
34+
host=self.host,
35+
access_key=credentials.access_key,
36+
secret_key=credentials.secret_key,
37+
region=session.get_config_variable('region'),
38+
token=credentials.token,
39+
)
40+
return client.first_message()
41+
42+
def receive(self, auth_bytes):
43+
self._is_done = True
44+
self._is_authenticated = auth_bytes != b''
45+
self._auth = auth_bytes.deode('utf-8')
46+
47+
def is_done(self):
48+
return self._is_done
49+
50+
def is_authenticated(self):
51+
return self._is_authenticated
52+
53+
def auth_details(self):
54+
if not self.is_authenticated:
55+
raise RuntimeError('Not authenticated yet!')
56+
return 'Authenticated via SASL / AWS_MSK_IAM %s' % (self._auth,)
57+
58+
59+
class AwsMskIamClient:
60+
UNRESERVED_CHARS = string.ascii_letters + string.digits + '-._~'
61+
62+
def __init__(self, host, access_key, secret_key, region, token=None):
63+
"""
64+
Arguments:
65+
host (str): The hostname of the broker.
66+
access_key (str): An AWS_ACCESS_KEY_ID.
67+
secret_key (str): An AWS_SECRET_ACCESS_KEY.
68+
region (str): An AWS_REGION.
69+
token (Optional[str]): An AWS_SESSION_TOKEN if using temporary
70+
credentials.
71+
"""
72+
self.algorithm = 'AWS4-HMAC-SHA256'
73+
self.expires = '900'
74+
self.hashfunc = hashlib.sha256
75+
self.headers = [
76+
('host', host)
77+
]
78+
self.version = '2020_10_22'
79+
80+
self.service = 'kafka-cluster'
81+
self.action = '{}:Connect'.format(self.service)
82+
83+
now = datetime.datetime.utcnow()
84+
self.datestamp = now.strftime('%Y%m%d')
85+
self.timestamp = now.strftime('%Y%m%dT%H%M%SZ')
86+
87+
self.host = host
88+
self.access_key = access_key
89+
self.secret_key = secret_key
90+
self.region = region
91+
self.token = token
92+
93+
@property
94+
def _credential(self):
95+
return '{0.access_key}/{0._scope}'.format(self)
96+
97+
@property
98+
def _scope(self):
99+
return '{0.datestamp}/{0.region}/{0.service}/aws4_request'.format(self)
100+
101+
@property
102+
def _signed_headers(self):
103+
"""
104+
Returns (str):
105+
An alphabetically sorted, semicolon-delimited list of lowercase
106+
request header names.
107+
"""
108+
return ';'.join(sorted(k.lower() for k, _ in self.headers))
109+
110+
@property
111+
def _canonical_headers(self):
112+
"""
113+
Returns (str):
114+
A newline-delited list of header names and values.
115+
Header names are lowercased.
116+
"""
117+
return '\n'.join(map(':'.join, self.headers)) + '\n'
118+
119+
@property
120+
def _canonical_request(self):
121+
"""
122+
Returns (str):
123+
An AWS Signature Version 4 canonical request in the format:
124+
<Method>\n
125+
<Path>\n
126+
<CanonicalQueryString>\n
127+
<CanonicalHeaders>\n
128+
<SignedHeaders>\n
129+
<HashedPayload>
130+
"""
131+
# The hashed_payload is always an empty string for MSK.
132+
hashed_payload = self.hashfunc(b'').hexdigest()
133+
return '\n'.join((
134+
'GET',
135+
'/',
136+
self._canonical_querystring,
137+
self._canonical_headers,
138+
self._signed_headers,
139+
hashed_payload,
140+
))
141+
142+
@property
143+
def _canonical_querystring(self):
144+
"""
145+
Returns (str):
146+
A '&'-separated list of URI-encoded key/value pairs.
147+
"""
148+
params = []
149+
params.append(('Action', self.action))
150+
params.append(('X-Amz-Algorithm', self.algorithm))
151+
params.append(('X-Amz-Credential', self._credential))
152+
params.append(('X-Amz-Date', self.timestamp))
153+
params.append(('X-Amz-Expires', self.expires))
154+
if self.token:
155+
params.append(('X-Amz-Security-Token', self.token))
156+
params.append(('X-Amz-SignedHeaders', self._signed_headers))
157+
158+
return '&'.join(self._uriencode(k) + '=' + self._uriencode(v) for k, v in params)
159+
160+
@property
161+
def _signing_key(self):
162+
"""
163+
Returns (bytes):
164+
An AWS Signature V4 signing key generated from the secret_key, date,
165+
region, service, and request type.
166+
"""
167+
key = self._hmac(('AWS4' + self.secret_key).encode('utf-8'), self.datestamp)
168+
key = self._hmac(key, self.region)
169+
key = self._hmac(key, self.service)
170+
key = self._hmac(key, 'aws4_request')
171+
return key
172+
173+
@property
174+
def _signing_str(self):
175+
"""
176+
Returns (str):
177+
A string used to sign the AWS Signature V4 payload in the format:
178+
<Algorithm>\n
179+
<Timestamp>\n
180+
<Scope>\n
181+
<CanonicalRequestHash>
182+
"""
183+
canonical_request_hash = self.hashfunc(self._canonical_request.encode('utf-8')).hexdigest()
184+
return '\n'.join((self.algorithm, self.timestamp, self._scope, canonical_request_hash))
185+
186+
def _uriencode(self, msg):
187+
"""
188+
Arguments:
189+
msg (str): A string to URI-encode.
190+
191+
Returns (str):
192+
The URI-encoded version of the provided msg, following the encoding
193+
rules specified: https://github.com/aws/aws-msk-iam-auth#uriencode
194+
"""
195+
return urllib.parse.quote(msg, safe=self.UNRESERVED_CHARS)
196+
197+
def _hmac(self, key, msg):
198+
"""
199+
Arguments:
200+
key (bytes): A key to use for the HMAC digest.
201+
msg (str): A value to include in the HMAC digest.
202+
Returns (bytes):
203+
An HMAC digest of the given key and msg.
204+
"""
205+
return hmac.new(key, msg.encode('utf-8'), digestmod=self.hashfunc).digest()
206+
207+
def first_message(self):
208+
"""
209+
Returns (bytes):
210+
An encoded JSON authentication payload that can be sent to the
211+
broker.
212+
"""
213+
signature = hmac.new(
214+
self._signing_key,
215+
self._signing_str.encode('utf-8'),
216+
digestmod=self.hashfunc,
217+
).hexdigest()
218+
msg = {
219+
'version': self.version,
220+
'host': self.host,
221+
'user-agent': 'kafka-python',
222+
'action': self.action,
223+
'x-amz-algorithm': self.algorithm,
224+
'x-amz-credential': self._credential,
225+
'x-amz-date': self.timestamp,
226+
'x-amz-signedheaders': self._signed_headers,
227+
'x-amz-expires': self.expires,
228+
'x-amz-signature': signature,
229+
}
230+
if self.token:
231+
msg['x-amz-security-token'] = self.token
232+
233+
return json.dumps(msg, separators=(',', ':')).encode('utf-8')

Diff for: test/sasl/test_msk.py

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import datetime
2+
import json
3+
4+
from kafka.sasl.msk import AwsMskIamClient
5+
6+
try:
7+
from unittest import mock
8+
except ImportError:
9+
import mock
10+
11+
12+
def client_factory(token=None):
13+
now = datetime.datetime.utcfromtimestamp(1629321911)
14+
with mock.patch('kafka.sasl.msk.datetime') as mock_dt:
15+
mock_dt.datetime.utcnow = mock.Mock(return_value=now)
16+
return AwsMskIamClient(
17+
host='localhost',
18+
access_key='XXXXXXXXXXXXXXXXXXXX',
19+
secret_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
20+
region='us-east-1',
21+
token=token,
22+
)
23+
24+
25+
def test_aws_msk_iam_client_permanent_credentials():
26+
client = client_factory(token=None)
27+
msg = client.first_message()
28+
assert msg
29+
assert isinstance(msg, bytes)
30+
actual = json.loads(msg)
31+
32+
expected = {
33+
'version': '2020_10_22',
34+
'host': 'localhost',
35+
'user-agent': 'kafka-python',
36+
'action': 'kafka-cluster:Connect',
37+
'x-amz-algorithm': 'AWS4-HMAC-SHA256',
38+
'x-amz-credential': 'XXXXXXXXXXXXXXXXXXXX/20210818/us-east-1/kafka-cluster/aws4_request',
39+
'x-amz-date': '20210818T212511Z',
40+
'x-amz-signedheaders': 'host',
41+
'x-amz-expires': '900',
42+
'x-amz-signature': '0fa42ae3d5693777942a7a4028b564f0b372bafa2f71c1a19ad60680e6cb994b',
43+
}
44+
assert actual == expected
45+
46+
47+
def test_aws_msk_iam_client_temporary_credentials():
48+
client = client_factory(token='XXXXX')
49+
msg = client.first_message()
50+
assert msg
51+
assert isinstance(msg, bytes)
52+
actual = json.loads(msg)
53+
54+
expected = {
55+
'version': '2020_10_22',
56+
'host': 'localhost',
57+
'user-agent': 'kafka-python',
58+
'action': 'kafka-cluster:Connect',
59+
'x-amz-algorithm': 'AWS4-HMAC-SHA256',
60+
'x-amz-credential': 'XXXXXXXXXXXXXXXXXXXX/20210818/us-east-1/kafka-cluster/aws4_request',
61+
'x-amz-date': '20210818T212511Z',
62+
'x-amz-signedheaders': 'host',
63+
'x-amz-expires': '900',
64+
'x-amz-signature': 'b0619c50b7ecb4a7f6f92bd5f733770df5710e97b25146f97015c0b1db783b05',
65+
'x-amz-security-token': 'XXXXX',
66+
}
67+
assert actual == expected

0 commit comments

Comments
 (0)