Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

[#227] implement jku checking feature #424

Merged
merged 2 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion beacon_api/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ def parse_oauth2_config_file(path: str) -> Any:
"""Parse configuration file."""
config = ConfigParser()
config.read(path)
config_vars: Dict[str, Union[str, bool, None]] = {
config_vars: Dict[str, Union[str, bool, None, List[str]]] = {
"server": config.get("oauth2", "server"),
"issuers": config.get("oauth2", "issuers"),
"userinfo": config.get("oauth2", "userinfo"),
"audience": config.get("oauth2", "audience") or None,
"verify_aud": bool(strtobool(config.get("oauth2", "verify_aud"))),
"bona_fide_value": config.get("oauth2", "bona_fide_value"),
"trusted_jkus": config.get("oauth2", "trusted_jkus", fallback="").split(","),
}
return convert(config_vars)

Expand Down
5 changes: 5 additions & 0 deletions beacon_api/conf/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,8 @@ audience=
# If your service is not part of any network or AAI, but you still want to use tokens
# produced by other AAI parties, set this value to False to skip the audience validation step
verify_aud=False

# Comma separated list of trusted JKUs for checking passports
# Passport with an untrusted JKU will be denied access
# Leave empty to disable JKU checking
trusted_jkus=
7 changes: 7 additions & 0 deletions beacon_api/permissions/ga4gh.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ async def get_ga4gh_permissions(token: str) -> Tuple[set, bool]:
for encoded_passport in encoded_passports:
# Decode passport
header, payload = await decode_passport(encoded_passport)
# If trusted_jkus variable is set, only allow passports with a trusted JKU
if not OAUTH2_CONFIG.trusted_jkus == [""]:
# Skip passports with untrusted JKUs
passport_jku = header.get("jku")
if passport_jku not in OAUTH2_CONFIG.trusted_jkus:
LOG.debug("Untrusted JKU.")
continue
# Sort passports that carry dataset permissions
pass_type = payload.get("ga4gh_visa_v1", {}).get("type")
if pass_type == "ControlledAccessGrants": # nosec
Expand Down
5 changes: 5 additions & 0 deletions tests/test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,8 @@ audience=
# If your service is not part of any network or AAI, but you still want to use tokens
# produced by other AAI parties, set this value to False to skip the audience validation step
verify_aud=False

# Comma separated list of trusted JKUs for checking passports
# Passport with an untrusted JKU will be denied access
# Leave empty to disable JKU checking
trusted_jkus=http://test.csc.fi/jwk
51 changes: 51 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .test_app import PARAMS, generate_token
from testfixtures import TempDirectory
from test.support.os_helper import EnvironmentVarGuard
from beacon_api.conf import OAUTH2_CONFIG


def mock_token(bona_fide, permissions, auth):
Expand Down Expand Up @@ -70,6 +71,11 @@ async def load_datafile(self, vcf, datafile, datasetId, n=1000, min_ac=1):
return ["datasetId", "variants"]


async def mock_get_ga4gh_controlled(input):
"""Mock retrieve dataset permissions."""
return input


class TestBasicFunctions(unittest.IsolatedAsyncioTestCase):
"""Test supporting functions."""

Expand Down Expand Up @@ -482,5 +488,50 @@ async def test_get_ga4gh_permissions(self, m_userinfo, m_decode, m_controlled, m
self.assertEqual(bona_fide_status, True)


class TestCaseCheckJku(unittest.IsolatedAsyncioTestCase):
"""Test case."""

@unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_bona_fide")
@unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_controlled", side_effect=mock_get_ga4gh_controlled)
@unittest.mock.patch("beacon_api.permissions.ga4gh.decode_passport")
@unittest.mock.patch("beacon_api.permissions.ga4gh.retrieve_user_data")
async def test_jku_check(self, m_userinfo, m_decode, m_controller, m_bonafide):
"""Test trusted and untrusted jku."""
# Test: trusted jku
m_userinfo.return_value = [""]
header = {"jku": "http://test.csc.fi/jwk"}
payload = {"ga4gh_visa_v1": {"type": "ControlledAccessGrants"}}
m_decode.return_value = header, payload
m_bonafide.return_value = False
dataset_permissions, bona_fide_status = await get_ga4gh_permissions({})
self.assertEqual(dataset_permissions, [("", header)])
self.assertEqual(bona_fide_status, False)
# Test: untrusted jku
m_userinfo.return_value = [""]
header = {"jku": "untrusted_jku"}
payload = {"ga4gh_visa_v1": {"type": "ControlledAccessGrants"}}
m_decode.return_value = header, payload
m_bonafide.return_value = False
dataset_permissions, bona_fide_status = await get_ga4gh_permissions({})
self.assertEqual(dataset_permissions, [])
self.assertEqual(bona_fide_status, False)

@unittest.mock.patch("beacon_api.permissions.ga4gh.OAUTH2_CONFIG", new=OAUTH2_CONFIG._replace(trusted_jkus=[""]))
@unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_bona_fide")
@unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_controlled", side_effect=mock_get_ga4gh_controlled)
@unittest.mock.patch("beacon_api.permissions.ga4gh.decode_passport")
@unittest.mock.patch("beacon_api.permissions.ga4gh.retrieve_user_data")
async def test_jku_check_not_active(self, m_userinfo, m_decode, m_controller, m_bonafide):
"""Test if jku check is skipped when trusted_jkus config var is not set."""
m_userinfo.return_value = [""]
header = {"jku": "untrusted_jku"}
payload = {"ga4gh_visa_v1": {"type": "ControlledAccessGrants"}}
m_decode.return_value = header, payload
m_bonafide.return_value = False
dataset_permissions, bona_fide_status = await get_ga4gh_permissions({})
self.assertEqual(dataset_permissions, [("", header)])
self.assertEqual(bona_fide_status, False)


if __name__ == "__main__":
unittest.main()