Skip to content

[FEAT] Add support for JWT Authentication #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DESCRIPTION.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Features
- Supported authentication types:
- using a username and password
- using a refresh token
- using JWT
- Subscribe to and receive messages on:
- `PushTopics <PushTopic_>`_
- `Generic Streaming Channels <GenericStreaming_>`_
Expand Down
57 changes: 57 additions & 0 deletions aiosfstream/auth.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Authenticatior class implementations"""
import time
from abc import abstractmethod
from http import HTTPStatus
import reprlib
import json
import jwt
from typing import Optional, Tuple

from aiocometd import AuthExtension
Expand Down Expand Up @@ -225,4 +227,59 @@ async def _authenticate(self) -> Tuple[int, JsonObject]:
response_data = await response.json(loads=self.json_loads)
return response.status, response_data


class JWTAuthenticator(AuthenticatorBase):
"""Authenticator for using JWT Flow"""

def __init__(self, consumer_key: str,
username: str, private_key, sandbox: bool = False,
json_dumps: JsonDumper = json.dumps,
json_loads: JsonLoader = json.loads) -> None:
"""
:param consumer_key: Consumer key from the Salesforce connected \
app definition
:param username: Salesforce username
:param private_key: Private key registered in Salesforce connected app
:param sandbox: Marks whether the authentication has to be done \
for a sandbox org or for a production org
:param json_dumps: Function for JSON serialization, the default is \
:func:`json.dumps
:param json_loads: Function for JSON deserialization, the default is \
:func:`json.loads`
"""
super().__init__(sandbox=sandbox,
json_dumps=json_dumps,
json_loads=json_loads)
#: OAuth2 client id
self.client_id = consumer_key
#: Salesforce username
self.username = username
#: Salesforce username
self.private_key = private_key

def __repr__(self) -> str:
"""Formal string representation"""
cls_name = type(self).__name__
return f"{cls_name}(consumer_key={reprlib.repr(self.client_id)}," \
f"username={reprlib.repr(self.username)}, " \
f"private_key={reprlib.repr(self.private_key)})"

async def _authenticate(self) -> Tuple[int, JsonObject]:
async with ClientSession(json_serialize=self.json_dumps) as session:
claim = {
'iss': self.client_id,
'exp': int(time.time()) + 300,
'aud': 'https://{}.salesforce.com'.format('test' if self._sandbox else 'login'),
'sub': self.username,
}
assertion = jwt.encode(claim, self.private_key, algorithm='RS256', headers={
'alg': 'RS256'})
data = {
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"assertion": assertion,
}
response = await session.post(self._token_url, data=data)
response_data = await response.json(loads=self.json_loads)
return response.status, response_data

# pylint: enable=too-many-arguments
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

INSTALL_REQUIRES = [
"aiocometd>=0.4.1,<0.5.0",
"aiohttp>=3.1,<4.0"
"aiohttp>=3.1,<4.0",
"PyJWT>=2.1.0,<2.4.1"
]
TESTS_REQUIRE = [
"asynctest>=0.12.0,<1.0.0",
Expand Down
96 changes: 92 additions & 4 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from http import HTTPStatus
import reprlib

from asynctest import TestCase, mock
import time
from http import HTTPStatus
import jwt
from aiohttp.client_exceptions import ClientError
from asynctest import TestCase, mock

from aiosfstream.auth import AuthenticatorBase, PasswordAuthenticator, \
TOKEN_URL, SANDBOX_TOKEN_URL, RefreshTokenAuthenticator
TOKEN_URL, SANDBOX_TOKEN_URL, RefreshTokenAuthenticator, JWTAuthenticator
from aiosfstream.exceptions import AuthenticationError


Expand Down Expand Up @@ -246,3 +247,90 @@ def test_repr(self):
f"consumer_secret={reprlib.repr(auth.client_secret)}, "
f"refresh_token={reprlib.repr(auth.refresh_token)})"
)


class TestJWTAuthenticator(TestCase):
def setUp(self):
self.authenticator = JWTAuthenticator(
consumer_key="id",
username="username",
private_key="""-----BEGIN PRIVATE KEY-----
MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCT/YuMSf0WinGx
MS7TO3ssO3iLYwAlJa97yUSKXDbYX2QwFDng4a9OkAvUxf4pxvppy85+97/LY5W7
kQ0zxnn4VD12UvA5iL3X1GgUUfqyG56CbYVJP2livmR5cERrtxxg1dA3hhbFP8l1
JllsgNAi0KWfxi9Do0HPSDRmE4/4Dr9MsPmvcUTw7cVs+VVz8RDHWbMB7ptJb+30
Qwmp81XTecysVpxEs/ZcSo5ej4vNqjGwERSEoWDu3CY2SoRAaUvDP1xBgvOo7w2q
pSrBxo36t0iQxSg0q1JcyDncD0jHH8qYEm6ZbjdAGbaXqz1DAXECJcjh5yeiWTPn
uI0I6lltAgMBAAECggEAZmhys87Tc1C0YiCdvZCQqMgyn4oPpKUSbT+WzYQIc+x2
4QpuDt89t8jYVxH30qMd0C43pAb/UtiD2fsDPsvexWhN695y2+1zKbKTn1Qnfi68
npb5P/nAjJMh5iM5Ray914i+AF4qza5ZU1cJVJtC7ISjyA+Vz2Fe/fiCQgzReJ52
gc5KuOhF/0FBZtOvhqVubJYxngpZCGfpUi28MyN+f6fYm4vtvkDZLJETb+LcyR4M
inlFu2hwl+ZVXIXM8EybPyBphSzUszCUPnw20rVJRu2HWF0kE7jC3+BS5rBcaWoJ
qboGbrpbHl1AGnav3c+o3/JUr+TqoE+qwclwpimFBQKBgQDDNclovApsokScEsqX
RI/UD1WwgWKIztoRkPBCsJpLQPGv8eJJI25HWePQN6NzN8inKY0SO7+UN5u32Yg6
AWUWvAs2IVH17Y88YkbET2kQEX1kYVga6dCSnN2h98yED26+NtJq6kGVjswUZVU8
yG4cweZtR5JL+avTltBiIrL1MwKBgQDCE2ORym5uo18rsFQKyiJGJrmMGwTX8WvZ
BexWfVfhaMNwLgHcvrWp91d2TfL5LAStOLDoL0amlTggRDsqmy1N5S8fEPzfc+oY
4EZkDzT9dJZnRVHUdfLA1HOFnQ+L4Xd296ors7seuN71NLU9dRjWYlXsIe88gacW
ZOCW93o23wKBgBB7EQcLoSGszXgTyhDdU/tGVCizs7rzI8wJ3Y7z1AL4d68wD7e3
Cw9xEl+44s7Obd1XD7bzXmhIDZiHAA5Nodg6hgPK6l2F8eraLTlTrv4RS/HWmhaj
mN1X6wpKnnSjzOi4PimSn3jd9nLeX0TjcxBwemDNgxdw+8XAXNV8MnmrAoGABK0I
6htRe9Lt2RSfgb8LAluufsSr4jQL4Ce3YQIWGvU2OD6zhskFgXnXHp+UKhK4bh/+
iymQbzULLCPYtRcWCVlrQDldjlixnDXTHFgNc8naUdSmuxK4bZLw0ZhOJpWhFjmz
XOgwqvXTUV8ausdWeNvXrB/JLtEE4JI/owOFa0sCgYAY5jeBany8hIBQqAlnb6Ab
ZH+xkHDKhBWU6UWBHfJuZNVsGSLDYuZELyJ+tejy6gDffEjxAAps3ZxTe9Pae4WL
5oMTFlZd2r2yfjSplVPPhJwq07FXQJ89UlgGLuNrqSUsntImjSmqrC1Hy9owJvyU
GP6HlmVV5x1WaBt0HxRLXg==
-----END PRIVATE KEY-----
""")

@mock.patch("aiosfstream.auth.ClientSession")
async def test_authenticate(self, session_cls):
status = object()
response_data = object()
response_obj = mock.MagicMock()
response_obj.json = mock.CoroutineMock(return_value=response_data)
response_obj.status = status
session = mock.MagicMock()
session.__aenter__ = mock.CoroutineMock(return_value=session)
session.__aexit__ = mock.CoroutineMock()
session.post = mock.CoroutineMock(return_value=response_obj)
session_cls.return_value = session
claim = {
'iss': self.authenticator.client_id,
'exp': int(time.time()) + 300,
'aud': 'https://login.salesforce.com',
'sub': self.authenticator.username,
}
assertion = jwt.encode(claim, self.authenticator.private_key, algorithm='RS256', headers={
'alg': 'RS256'})
expected_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"assertion": assertion,
}

result = await self.authenticator._authenticate()

self.assertEqual(result, (status, response_data))
session_cls.assert_called_with(
json_serialize=self.authenticator.json_dumps
)
session.post.assert_called_with(self.authenticator._token_url,
data=expected_data)
response_obj.json.assert_called_with(
loads=self.authenticator.json_loads
)
session.__aenter__.assert_called()
session.__aexit__.assert_called()

def test_repr(self):
result = repr(self.authenticator)

cls_name = type(self.authenticator).__name__
auth = self.authenticator
self.assertEqual(
result,
f"{cls_name}(consumer_key={reprlib.repr(auth.client_id)},"
f"username={reprlib.repr(auth.username)}, "
f"private_key={reprlib.repr(auth.private_key)})"
)