diff --git a/.travis.yml b/.travis.yml index e5d8bc9..03f9c5d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,7 @@ python: addons: apt: packages: - - + - install: - pip install codecov - pip install tox @@ -26,6 +26,6 @@ deploy: tags: true distributions: bdist_wheel skip_existing: true - user: __token__ + username: __token__ password: - secure: "kW7wRRnQDkeD/bvDrktoLTLrFV4W0B548PhtXVBD7rfbx09oE+jmoM6Ojc2y+t18XQ0zYIfO1qXdoenzti2YiY8xGgA1EYDpHuT0d+DX3tPuLiXou+pts+dXo5Q0YJRFlmBgxtyT+eHhScojDPvDmCFQodIQhiGK1cyzV1p+jE9W/wjiCOQWSQ0LoIV/ajjIMArEmucqwU1wlXxPPgVaV2EuZ7hXBULA6C1DKJENja+qeORtDdW6DMXPJQ4AD74dR+IHaqX4W+mRjRO8wZHKBxrxQ65nZErcfwjbqWRhPIUWzkVngC21BWrs/5eMvxuSPDj4XUY/wiB/1xR883bM6YypEqlCM5x3CFA6a65iI+dfT4iqQRdwOF04oxDkNf2N1LBx8Pv01KxDs5cblv3i/TQHoBIKH8wY6yxrVT2gtcGOwj5+/Pay3PGUN4S8YyzzmSuX/trQgWl/6BD2UPDRchhz24AfV8vXPr6ljBllNdGtWVWJd0ad7u2nqTrUcec909cAhz9SHP1LO8IqCjh8XGdQATOmZXptXJLLmysN9+OwgyTA4SH0Q4FKPG/xriq2cbMCfBTnBOTb5N9ZCm1VeGZlcMdva9UDcRWjtEStAy+gnYQc10EBD060O0jUDB9qdm5zyiTSqOY701828Z2a5uMWXqWUlEsbF61BN6cuD4s=" + secure: sCvQSnLW2R9i01dTvG1JyZ+lU99jXjEmPeniSTB8SAZMlfIbuo8I5kjkKxxaa4uTOU3mZiWDG/wq/liUuUzxjXYLA47gwVqU6QrkgIh0/QqGIN1+GYtHqZZETueGClSS8drCgYchpknNTA+30JL/kpt4fjq5hSUFII5BuybVYbh5WVnmwjhZOnZdKxVuQrrfzNVe14wr/TY7VvtL7W5bSCigxYh4oSUiFB6WkFyPZFetqYRO5eCTRt4Tn/hT3Y5Aiz033hMFhrjB4XxuWK7P1f6WVVGRMfAwa4XuC3w0oPSp/9I0cMBtLkExs+tdq25lgmJOcdzVwo3QHPs4pn6AjHKPyoTr/jiuysKnSZCwgcMPFpORgrOWQh/qFxHqNFI0QzNasZXhEBDJaaR2pN/GLjW97NiuaCFuZ/me0SWtpdhPuJpjgx9xxwymRRC4aBu6aPrzoxFGABp4Bkica7KsBmWgQ0ZFMPO6bXB6bU5w8a+wlOY/6I8b06/cIOIyMh8Tas7UwO2/e0L65qhAkR3u1evmagex94w9UameP+518yYik0L5uwrn7vPeRNpH1UEctd3RG73HlEdu8wMCo0pPrvFqsQGtIiPTLPx99453z8Z/raWwAa1Zpn2c8mSm9YIOQZ20rsiFy1vfwAm2b7/dfm005hUirmfwtiG7Oq8iLdg= diff --git a/setup.py b/setup.py index 58825e1..11f55a7 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,8 @@ author="Roland Hedberg", author_email="roland@catalogix.se", license="Apache 2.0", - packages=["cryptojwt", "cryptojwt/jwe", "cryptojwt/jwk", "cryptojwt/jws", "cryptojwt/tools"], + packages=["cryptojwt", "cryptojwt/jwe", "cryptojwt/jwk", "cryptojwt/jws", "cryptojwt/tools", + "cryptojwt/serialize"], package_dir={"": "src"}, classifiers=[ "Development Status :: 4 - Beta", diff --git a/src/cryptojwt/__init__.py b/src/cryptojwt/__init__.py index 3fc2f34..fb25595 100644 --- a/src/cryptojwt/__init__.py +++ b/src/cryptojwt/__init__.py @@ -21,7 +21,7 @@ except ImportError: pass -__version__ = '0.8.4' +__version__ = '1.0.0' logger = logging.getLogger(__name__) diff --git a/src/cryptojwt/exception.py b/src/cryptojwt/exception.py index a561120..ae073ea 100644 --- a/src/cryptojwt/exception.py +++ b/src/cryptojwt/exception.py @@ -43,6 +43,18 @@ class MissingKey(JWKESTException): """ No usable key """ +class KeyIOError(Exception): + pass + + +class UnknownKeyType(KeyIOError): + pass + + +class UpdateFailed(KeyIOError): + pass + + class UnknownKeytype(Invalid): """An unknown key type""" @@ -87,18 +99,10 @@ class WrongKeyType(JWKESTException): pass -class UnknownKeyType(JWKESTException): - pass - - class UnsupportedKeyType(JWKESTException): pass -class UpdateFailed(JWKESTException): - pass - - class WrongUsage(JWKESTException): pass diff --git a/src/cryptojwt/jwk/__init__.py b/src/cryptojwt/jwk/__init__.py index a28be9b..63582af 100644 --- a/src/cryptojwt/jwk/__init__.py +++ b/src/cryptojwt/jwk/__init__.py @@ -248,6 +248,9 @@ def appropriate_for(self, usage, **kwargs): elif self.key_ops: return usage in self.key_ops + def update(self): + pass + def pems_to_x5c(cert_chain): """ @@ -303,6 +306,12 @@ def certificate_fingerprint(der, hash="sha256"): return ':'.join([fp[i:i + 2] for i in range(0, len(fp), 2)]).upper() +def calculate_x5t(der, hash='sha1'): + val = certificate_fingerprint(der, hash) + val = val.replace(':', '') + return base64.b64encode(as_bytes(val)) + + def pem_hash(pem_file): with open(pem_file, "r") as fp: pem = fp.read() diff --git a/src/cryptojwt/jwk/jwk.py b/src/cryptojwt/jwk/jwk.py index 38f48cc..2e4827c 100644 --- a/src/cryptojwt/jwk/jwk.py +++ b/src/cryptojwt/jwk/jwk.py @@ -66,7 +66,7 @@ def key_from_jwk_dict(jwk_dict, private=None): """ # uncouple from the original item - _jwk_dict = copy.copy(jwk_dict) + _jwk_dict = copy.deepcopy(jwk_dict) if 'kty' not in _jwk_dict: raise MissingValue('kty missing') diff --git a/src/cryptojwt/jwt.py b/src/cryptojwt/jwt.py index 094ad18..44578c5 100755 --- a/src/cryptojwt/jwt.py +++ b/src/cryptojwt/jwt.py @@ -80,7 +80,7 @@ def __init__(self, key_jar=None, iss='', lifetime=0, enc_enc="A128CBC-HS256", enc_alg="RSA1_5", msg_cls=None, iss2msg_cls=None, skew=15, allowed_sign_algs=None, allowed_enc_algs=None, - allowed_enc_encs=None): + allowed_enc_encs=None, zip=''): self.key_jar = key_jar # KeyJar instance self.iss = iss # My identifier self.lifetime = lifetime # default life time of the signature @@ -99,6 +99,7 @@ def __init__(self, key_jar=None, iss='', lifetime=0, self.allowed_sign_algs = allowed_sign_algs self.allowed_enc_algs = allowed_enc_algs self.allowed_enc_encs = allowed_enc_encs + self.zip = zip def receiver_keys(self, recv, use): """ @@ -107,7 +108,7 @@ def receiver_keys(self, recv, use): :param use: What the keys should be usable for :return: A list of keys. """ - return self.key_jar.get(use, owner=recv) + return self.key_jar.get(use, issuer_id=recv) def receivers(self): """Return a list of identifiers. @@ -117,20 +118,22 @@ def receivers(self): """ return self.key_jar.owners - def my_keys(self, owner_id='', use='sig'): - _k = self.key_jar.get(use, owner=owner_id) - if owner_id != '': + def my_keys(self, issuer_id='', use='sig'): + _k = self.key_jar.get(use, issuer_id=issuer_id) + if issuer_id != '': try: - _k.extend(self.key_jar.get(use, owner='')) + _k.extend(self.key_jar.get(use, issuer_id='')) except KeyError: pass return _k - def _encrypt(self, payload, recv, cty='JWT'): + def _encrypt(self, payload, recv, cty='JWT', zip=''): kwargs = {"alg": self.enc_alg, "enc": self.enc_enc} if cty: kwargs["cty"] = cty + if zip: + kwargs['zip'] = zip # use the clients public key for encryption _jwe = JWE(payload, **kwargs) @@ -173,15 +176,15 @@ def pack_init(self, recv, aud): return argv - def pack_key(self, owner_id='', kid=''): + def pack_key(self, issuer_id='', kid=''): """ Find a key to be used for signing the Json Web Token - :param owner_id: Owner of the keys to chose from + :param issuer_id: Owner of the keys to chose from :param kid: Key ID :return: One key """ - keys = pick_key(self.my_keys(owner_id, 'sig'), 'sig', alg=self.alg, + keys = pick_key(self.my_keys(issuer_id, 'sig'), 'sig', alg=self.alg, kid=kid) if not keys: @@ -189,12 +192,12 @@ def pack_key(self, owner_id='', kid=''): return keys[0] # Might be more then one if kid == '' - def pack(self, payload=None, kid='', owner='', recv='', aud=None, **kwargs): + def pack(self, payload=None, kid='', issuer_id='', recv='', aud=None, **kwargs): """ :param payload: Information to be carried as payload in the JWT :param kid: Key ID - :param owner: The owner of the the keys that are to be used for signing + :param issuer_id: The owner of the the keys that are to be used for signing :param recv: The intended immediate receiver :param aud: Intended audience for this JWS/JWE, not expected to contain the recipient. @@ -221,12 +224,12 @@ def pack(self, payload=None, kid='', owner='', recv='', aud=None, **kwargs): _args['jti'] = _jti - if not owner and self.iss: - owner = self.iss + if not issuer_id and self.iss: + issuer_id = self.iss if self.sign: if self.alg != 'none': - _key = self.pack_key(owner, kid) + _key = self.pack_key(issuer_id, kid) # _args['kid'] = _key.kid else: _key = None @@ -238,9 +241,9 @@ def pack(self, payload=None, kid='', owner='', recv='', aud=None, **kwargs): if _encrypt: if not self.sign: - return self._encrypt(_sjwt, recv, cty='json') + return self._encrypt(_sjwt, recv, cty='json', zip=self.zip) - return self._encrypt(_sjwt, recv) + return self._encrypt(_sjwt, recv, zip=self.zip) else: return _sjwt diff --git a/src/cryptojwt/key_bundle.py b/src/cryptojwt/key_bundle.py index 2fdac77..dbd9cfd 100755 --- a/src/cryptojwt/key_bundle.py +++ b/src/cryptojwt/key_bundle.py @@ -1,4 +1,5 @@ """Implementation of a Key Bundle.""" +import copy import json import logging import os @@ -9,7 +10,6 @@ from cryptojwt.jwk.ec import NIST2SEC from cryptojwt.jwk.hmac import new_sym_key -from .exception import DeSerializationNotPossible from .exception import JWKException from .exception import UnknownKeyType from .exception import UnsupportedAlgorithm @@ -153,7 +153,7 @@ def ec_init(spec): class KeyBundle: """The Key Bundle""" - def __init__(self, keys=None, source="", cache_time=300, verify_ssl=True, + def __init__(self, keys=None, source="", cache_time=300, fileformat="jwks", keytype="RSA", keyusage=None, kid='', httpc=None, httpc_params=None): """ @@ -167,7 +167,6 @@ def __init__(self, keys=None, source="", cache_time=300, verify_ssl=True, :param keys: A dictionary or a list of dictionaries with the keys ["kty", "key", "alg", "use", "kid"] :param source: Where the key set can be fetch from - :param verify_ssl: Verify the SSL cert used by the server :param fileformat: For a local file either "jwks" or "der" :param keytype: Iff local file and 'der' format what kind of key it is. presently 'rsa' and 'ec' are supported. @@ -191,7 +190,7 @@ def __init__(self, keys=None, source="", cache_time=300, verify_ssl=True, self.imp_jwks = None self.last_updated = 0 self.last_remote = None # HTTP Date of last remote update - self.last_local = None # UNIX timestamp of last local update + self.last_local = None # UNIX timestamp of last local update if httpc: self.httpc = httpc @@ -256,6 +255,8 @@ def do_keys(self, keys): :param keys: :return: """ + _new_key = [] + for inst in keys: if inst["kty"].lower() in K2C: inst["kty"] = inst["kty"].lower() @@ -290,12 +291,15 @@ def do_keys(self, keys): if _key not in self._keys: if not _key.kid: _key.add_kid() - self._keys.append(_key) + _new_key.append(_key) _error = '' - break + if _error: LOGGER.warning('While loading keys, %s', _error) + if _new_key: + self._keys.extend(_new_key) + self.last_updated = time.time() def do_local_jwk(self, filename): @@ -440,7 +444,7 @@ def update(self): """ res = True # An update was successful if self.source: - _keys = self._keys # just in case + _old_keys = self._keys # just in case # reread everything self._keys = [] @@ -456,11 +460,11 @@ def update(self): res = self.do_remote() except Exception as err: LOGGER.error('Key bundle update failed: %s', err) - self._keys = _keys # restore + self._keys = _old_keys # restore return False now = time.time() - for _key in _keys: + for _key in _old_keys: if _key not in self._keys: if not _key.inactive_since: # If already marked don't mess _key.inactive_since = now @@ -616,7 +620,24 @@ def mark_as_inactive(self, kid): :param kid: The Key Identifier """ k = self.get_key_with_kid(kid) - k.inactive_since = time.time() + if k: + self._keys.remove(k) + k.inactive_since = time.time() + self._keys.append(k) + return True + else: + return False + + def mark_all_as_inactive(self): + """ + Mark a specific key as inactive based on the keys KeyID. + """ + _keys = self.keys() + _updated = [] + for k in _keys: + k.inactive_since = time.time() + _updated.append(k) + self._keys = _updated def remove_outdated(self, after, when=0): """ @@ -637,13 +658,16 @@ def remove_outdated(self, after, when=0): after = float(after) _kl = [] + changed = False for k in self._keys: if k.inactive_since and k.inactive_since + after < now: + changed = True continue _kl.append(k) - self._keys = _kl + self._keys= _kl + return changed def __contains__(self, key): return key in self._keys @@ -655,10 +679,10 @@ def copy(self): :return: The copy """ _bundle = KeyBundle() - _bundle.set(self._keys[:]) + _bundle._keys = self._keys[:] _bundle.cache_time = self.cache_time - _bundle.httpc_params = self.httpc_params.copy() + _bundle.httpc_params = copy.deepcopy(self.httpc_params) if self.source: _bundle.source = self.source _bundle.fileformat = self.fileformat @@ -741,7 +765,9 @@ def keybundle_from_local_file(filename, typ, usage, keytype="RSA"): usage = harmonize_usage(usage) if typ.lower() == "jwks": - _bundle = KeyBundle(source=filename, fileformat="jwks", keyusage=usage) + _bundle = KeyBundle(source=filename, + fileformat="jwks", + keyusage=usage) elif typ.lower() == "der": _bundle = KeyBundle(source=filename, fileformat="der", @@ -840,32 +866,27 @@ def build_key_bundle(key_conf, kid_template=""): kid = 0 - complete_bundle = KeyBundle() + _bundles = [] for spec in key_conf: typ = spec["type"].upper() + _bundle = None if typ == "RSA": if "key" in spec and spec["key"]: - error_to_catch = (OSError, IOError, - DeSerializationNotPossible) - try: + if os.path.isfile(spec["key"]): _bundle = KeyBundle(source="file://%s" % spec["key"], fileformat="der", - keytype=typ, keyusage=spec["use"]) - except error_to_catch: - _bundle = rsa_init(spec) + keytype=typ, + keyusage=spec["use"]) else: _bundle = rsa_init(spec) elif typ == "EC": if "key" in spec and spec["key"]: - error_to_catch = (OSError, IOError, - DeSerializationNotPossible) - try: + if os.path.isfile(spec["key"]): _bundle = KeyBundle(source="file://%s" % spec["key"], fileformat="der", - keytype=typ, keyusage=spec["use"]) - except error_to_catch: - _bundle = ec_init(spec) + keytype=typ, + keyusage=spec["use"]) else: _bundle = ec_init(spec) elif typ.lower() == "oct": @@ -873,11 +894,20 @@ def build_key_bundle(key_conf, kid_template=""): else: continue + if not _bundle: + continue + _set_kid(spec, _bundle, kid_template, kid) + _bundles.append(_bundle) - complete_bundle.extend(_bundle.keys()) + if _bundles: + complete_bundle = KeyBundle() + for _bundle in _bundles: + complete_bundle.extend(_bundle.keys()) - return complete_bundle + return complete_bundle + else: + return None def _cmp(kd1, kd2): @@ -1065,8 +1095,12 @@ def update_key_bundle(key_bundle, diff): pass else: _now = time.time() + _keys = key_bundle.keys() for k in _del: + _keys.remove(k) k.inactive_since = _now + _keys.append(k) + key_bundle.set(_keys) def key_rollover(bundle): diff --git a/src/cryptojwt/key_issuer.py b/src/cryptojwt/key_issuer.py new file mode 100755 index 0000000..2df6474 --- /dev/null +++ b/src/cryptojwt/key_issuer.py @@ -0,0 +1,635 @@ +import json +import logging +import os + +from requests import request + +from .jwe.utils import alg2keytype as jwe_alg2keytype +from .jws.utils import alg2keytype as jws_alg2keytype +from .key_bundle import KeyBundle +from .key_bundle import build_key_bundle + +__author__ = 'Roland Hedberg' + +from .key_bundle import key_diff +from .key_bundle import update_key_bundle + +from .utils import importer + +from .utils import qualified_name + +logger = logging.getLogger(__name__) + + +class KeyIssuer(object): + """ A key issuer instance contains a number of KeyBundles. """ + + def __init__(self, ca_certs=None, keybundle_cls=KeyBundle, + remove_after=3600, httpc=None, httpc_params=None, + name=''): + """ + KeyIssuer init function + + :param ca_certs: CA certificates, to be used for HTTPS + :param keybundle_cls: The KeyBundle class + :param remove_after: How long keys marked as inactive will remain in the key Jar. + :param httpc: A HTTP client to use. Default is Requests request. + :param httpc_params: HTTP request parameters + :param name: Issuer identifier + :return: KeyIssuer instance + """ + + self._bundles = [] + + self.keybundle_cls = keybundle_cls + self.name = name + + self.spec2key = {} + self.ca_certs = ca_certs + self.remove_after = remove_after + self.httpc = httpc or request + self.httpc_params = httpc_params or {} + + def __repr__(self) -> str: + return ''.format(self.name, self.key_summary()) + + def __getitem__(self, item): + return self.get_bundles()[item] + + def set(self, items): + self._bundles = items + + def get_bundles(self): + return [kb for kb in self._bundles] + + def add_url(self, url, **kwargs): + """ + Add a set of keys by url. This method will create a + :py:class:`oidcmsg.key_bundle.KeyBundle` instance with the + url as source specification. If no file format is given it's assumed + that what's on the other side is a JWKS. + + :param url: Where can the key/-s be found + :param kwargs: extra parameters for instantiating KeyBundle + :return: A :py:class:`oidcmsg.oauth2.keybundle.KeyBundle` instance + """ + + if not url: + raise KeyError("No url given") + + logger.debug('httpc_params: %s', self.httpc_params) + + if "/localhost:" in url or "/localhost/" in url: + _params = self.httpc_params.copy() + _params['verify'] = False + kb = self.keybundle_cls(source=url, httpc=self.httpc, httpc_params=_params, **kwargs) + else: + kb = self.keybundle_cls(source=url, httpc=self.httpc, httpc_params=self.httpc_params, + **kwargs) + + kb.update() + self._bundles.append(kb) + + return kb + + def add_symmetric(self, key, usage=None): + """ + Add a symmetric key. This is done by wrapping it in a key bundle + cloak since KeyJar does not handle keys directly but only through + key bundles. + + :param key: The key + :param usage: What the key can be used for signing/signature + verification (sig) and/or encryption/decryption (enc) + """ + + if usage is None: + self._bundles.append(self.keybundle_cls([{"kty": "oct", "key": key}])) + else: + for use in usage: + self._bundles.append(self.keybundle_cls([{"kty": "oct", "key": key, "use": use}])) + + def add_kb(self, kb): + """ + Add a key bundle. + + :param kb: A :py:class:`oidcmsg.key_bundle.KeyBundle` instance + """ + self._bundles.append(kb) + + def add(self, item, **kwargs): + if isinstance(item, KeyBundle): + self.add_kb(item) + elif item.startswith('http://') or item.startswith('file://') or item.startswith( + 'https://'): + self.add_url(item, **kwargs) + else: + self.add_symmetric(item, **kwargs) + + def all_keys(self): + """ + Get all the keys that belong to an entity. + + :return: A possibly empty list of keys + """ + res = [] + for kb in self._bundles: + res.extend(kb.keys()) + return res + + def __contains__(self, item): + for kb in self._bundles: + if item in kb: + return True + return False + + def items(self): + _res = {} + for kb in self._bundles: + if kb.source in _res: + _res[kb.source].append(kb) + else: + _res[kb.source] = [kb] + return _res + + def __str__(self): + _res = {} + for kb in self._bundles: + key_list = [] + for key in kb.keys(): + if key.inactive_since: + key_list.append( + '*{}:{}:{}'.format(key.kty, key.use, key.kid)) + else: + key_list.append( + '{}:{}:{}'.format(key.kty, key.use, key.kid)) + if kb.source in _res: + _res[kb.source] += ', ' + ', '.join(key_list) + else: + _res[kb.source] = ', '.join(key_list) + return json.dumps(_res) + + def load_keys(self, jwks_uri='', jwks=None): + """ + Fetch keys from another server + + :param jwks_uri: A URL pointing to a site that will return a JWKS + :param jwks: A dictionary representation of a JWKS + :return: Dictionary with usage as key and keys as values + """ + + if jwks_uri: + self.add_url(jwks_uri) + elif jwks: + # jwks should only be considered if no jwks_uri is present + _keys = jwks['keys'] + self._bundles.append(self.keybundle_cls(_keys)) + + def find(self, source): + """ + Find a key bundle based on the source of the keys + + :param source: A source url + :return: A list of :py:class:`oidcmsg.key_bundle.KeyBundle` instances, possibly empty + """ + return [kb for kb in self._bundles if kb.source == source] + + def export_jwks(self, private=False, usage=None): + """ + Produces a dictionary that later can be easily mapped into a + JSON string representing a JWKS. + + :param private: Whether it should be the private keys or the public + :param usage: If only keys for a special usage should be included + :return: A dictionary with one key: 'keys' + """ + keys = [] + for kb in self._bundles: + keys.extend([k.serialize(private) for k in kb.keys() if + k.inactive_since == 0 and ( + usage is None or (hasattr(k, 'use') and k.use == usage))]) + return {"keys": keys} + + def export_jwks_as_json(self, private=False, usage=None): + """ + Export a JWKS as a JSON document. + + :param private: Whether it should be the private keys or the public + :return: A JSON representation of a JWKS + """ + return json.dumps(self.export_jwks(private, usage=usage)) + + def import_jwks(self, jwks): + """ + Imports all the keys that are represented in a JWKS + + :param jwks: Dictionary representation of a JWKS + """ + try: + _keys = jwks["keys"] + except KeyError: + raise ValueError('Not a proper JWKS') + else: + self._bundles.append( + self.keybundle_cls(_keys, httpc=self.httpc, httpc_params=self.httpc_params)) + + def import_jwks_as_json(self, jwks, issuer_id): + """ + Imports all the keys that are represented in a JWKS expressed as a + JSON object + + :param jwks: JSON representation of a JWKS + :param issuer_id: Who 'owns' the JWKS + """ + return self.import_jwks(json.loads(jwks)) + + def import_jwks_from_file(self, filename, issuer_id): + with open(filename) as jwks_file: + self.import_jwks_as_json(jwks_file.read(), issuer_id) + + def remove_outdated(self, when=0): + """ + Goes through the complete list of issuers and for each of them removes + outdated keys. + Outdated keys are keys that has been marked as inactive at a time that + is longer ago then some set number of seconds (when). If when=0 the + the base time is set to now. + The number of seconds are carried in the remove_after parameter in the + key jar. + + :param when: To facilitate testing + """ + kbl = [] + changed = False + for kb in self._bundles: + if kb.remove_outdated(self.remove_after, when=when): + changed = True + kbl.append(kb) + + if changed: + self._bundles = kbl + + def get(self, key_use, key_type="", kid=None, alg='', **kwargs): + """ + Get all keys that matches a set of search criteria + + :param key_use: A key useful for this usage (enc, dec, sig, ver) + :param key_type: Type of key (rsa, ec, oct, ..) + :param kid: A Key Identifier + :return: A possibly empty list of keys + """ + + if key_use in ["dec", "enc"]: + use = "enc" + else: + use = "sig" + + if not key_type: + if alg: + if use == 'sig': + key_type = jws_alg2keytype(alg) + else: + key_type = jwe_alg2keytype(alg) + + lst = [] + for bundle in self._bundles: + if key_type: + if key_use in ['ver', 'dec']: + _bkeys = bundle.get(key_type, only_active=False) + else: + _bkeys = bundle.get(key_type) + else: + _bkeys = bundle.keys() + for key in _bkeys: + if key.inactive_since and key_use != "sig": + # Skip inactive keys unless for signature verification + continue + if not key.use or use == key.use: + if kid: + if key.kid == kid: + lst.append(key) + break + else: + continue + else: + lst.append(key) + + # If key algorithm is defined only return keys that can be used. + if alg: + lst = [key for key in lst if not key.alg or key.alg == alg] + + # if elliptic curve, have to check if I have a key of the right curve + if key_type == "EC" and "alg" in kwargs: + name = "P-{}".format(kwargs["alg"][2:]) # the type + _lst = [] + for key in lst: + if name != key.crv: + continue + _lst.append(key) + lst = _lst + + return lst + + def copy(self): + """ + Make deep copy of this key issuer. + + :return: A :py:class:`oidcmsg.key_issuer.KeyIssuer` instance + """ + ki = KeyIssuer() + ki._bundles = [kb.copy() for kb in self._bundles] + ki.httpc_params = self.httpc_params + ki.httpc = self.httpc + ki.keybundle_cls = self.keybundle_cls + return ki + + def __len__(self): + nr = 0 + for kb in self._bundles: + nr += len(kb) + return nr + + def dump(self, exclude=None): + """ + Returns the content as a dictionary. + + :return: A dictionary + """ + + _bundles = [] + for kb in self._bundles: + _bundles.append(kb.dump()) + + info = { + 'name': self.name, + 'bundles': _bundles, + 'keybundle_cls': qualified_name(self.keybundle_cls), + 'spec2key': self.spec2key, + 'ca_certs': self.ca_certs, + 'remove_after': self.remove_after, + 'httpc_params': self.httpc_params + } + return info + + def load(self, info): + """ + + :param items: A list with the information + :return: + """ + self.name = info['name'] + self.keybundle_cls = importer(info['keybundle_cls']) + self.spec2key = info['spec2key'] + self.ca_certs = info['ca_certs'] + self.remove_after = info['remove_after'] + self.httpc_params = info['httpc_params'] + self._bundles = [KeyBundle().load(val) for val in info['bundles']] + return self + + def update(self): + for kb in self._bundles: + kb.update() + + def mark_as_inactive(self, kid): + kbl = [] + changed = False + for kb in self._bundles: + if kb.mark_as_inactive(kid): + changed = True + kbl.append(kb) + if changed: + self._bundles = kbl + + def mark_all_keys_as_inactive(self): + kbl = [] + for kb in self._bundles: + kb.mark_all_as_inactive() + kbl.append(kb) + + self._bundles = kbl + + def key_summary(self): + """ + Return a text representation of all the keys. + + :return: A text representation of the keys + """ + key_list = [] + for kb in self._bundles: + for key in kb.keys(): + if key.inactive_since: + key_list.append( + '*{}:{}:{}'.format(key.kty, key.use, key.kid)) + else: + key_list.append( + '{}:{}:{}'.format(key.kty, key.use, key.kid)) + return ', '.join(key_list) + + def __iter__(self): + for bundle in self._bundles: + yield bundle + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + + if len(other.all_keys()) != len(self.all_keys()): + return False + + for k in self.all_keys(): + if k not in other: + return False + + for k in other.all_keys(): + if k not in self: + return False + + return True + + def rotate_keys(self, key_conf, kid_template=""): + """ + + :param key_conf: The configuration for the new keys + :param issuer: KeyIssuer instance + :param kid_template: A key id template + :return: + """ + new_keys = build_keyissuer(key_conf, kid_template) + self.mark_all_keys_as_inactive() + for kb in new_keys: + self.add_kb(kb) + return self + + +# ============================================================================= + + +def build_keyissuer(key_conf, kid_template="", key_issuer=None, issuer_id=''): + """ + Builds a :py:class:`oidcmsg.key_issuer.KeyIssuer` instance or adds keys to + an existing KeyIssuer instance based on a key specification. + + An example of such a specification:: + + keys = [ + {"type": "RSA", "key": "cp_keys/key.pem", "use": ["enc", "sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"], "kid": "ec.1"}, + {"type": "EC", "crv": "P-256", "use": ["enc"], "kid": "ec.2"} + {"type": "oct", "bytes": 32, "use":["sig"]} + ] + + Keys in this specification are: + + type + The type of key. Presently only 'rsa', 'oct' and 'ec' supported. + + key + A name of a file where a key can be found. Works with PEM encoded + RSA and EC private keys. + + use + What the key should be used for + + crv + The elliptic curve that should be used. Only applies to elliptic curve + keys :-) + + kid + Key ID, can only be used with one usage type is specified. If there + are more the one usage type specified 'kid' will just be ignored. + + :param key_conf: The key configuration + :param kid_template: A template by which to build the key IDs. If no + kid_template is given then the built-in function add_kid() will be used. + :param key_issuer: If an keyIssuer instance the new keys are added to this key issuer. + :param issuer_id: The identifier of the issuer + :return: A KeyIssuer instance + """ + + bundle = build_key_bundle(key_conf, kid_template) + if bundle is None: + return None + + if key_issuer is None: + key_issuer = KeyIssuer(name=issuer_id) + + key_issuer.add(bundle) + + return key_issuer + + +def init_key_issuer(public_path='', private_path='', key_defs='', read_only=True): + """ + A number of cases here: + + 1. A private path is given + + a. The file exists and a JWKS is found there. + From that JWKS a KeyJar instance is built. + b. + If the private path file doesn't exit the key definitions are + used to build a KeyJar instance. A JWKS with the private keys are + written to the file named in private_path. + + If a public path is also provided a JWKS with public keys are written + to that file. + + 2. A public path is given but no private path. + + a. If the public path file exists then the JWKS in that file is used to + construct a KeyJar. + b. If no such file exists then a KeyJar will be built + based on the key_defs specification and a JWKS with the public keys + will be written to the public path file. + + 3. If neither a public path nor a private path is given then a KeyJar is + built based on the key_defs specification and no JWKS will be written + to file. + + In all cases a KeyJar instance is returned + + The keys stored in the KeyJar will be stored under the '' identifier. + + :param public_path: A file path to a file that contains a JWKS with public + keys + :param private_path: A file path to a file that contains a JWKS with + private keys. + :param key_defs: A definition of what keys should be created if they are + not already available + :param read_only: This function should not attempt to write anything + to a file system. + :return: An instantiated :py:class;`oidcmsg.key_jar.KeyJar` instance + """ + + if private_path: + if os.path.isfile(private_path): + _jwks = open(private_path, 'r').read() + _issuer = KeyIssuer() + _issuer.import_jwks(json.loads(_jwks)) + if key_defs: + _kb = _issuer[0] + _diff = key_diff(_kb, key_defs) + if _diff: + update_key_bundle(_kb, _diff) + if read_only: + logger.error('Not allowed to write to disc!') + else: + _issuer.set([_kb]) + jwks = _issuer.export_jwks(private=True) + fp = open(private_path, 'w') + fp.write(json.dumps(jwks)) + fp.close() + else: + _issuer = build_keyissuer(key_defs) + if not read_only: + jwks = _issuer.export_jwks(private=True) + head, tail = os.path.split(private_path) + if head and not os.path.isdir(head): + os.makedirs(head) + fp = open(private_path, 'w') + fp.write(json.dumps(jwks)) + fp.close() + + if public_path and not read_only: + jwks = _issuer.export_jwks() # public part + head, tail = os.path.split(public_path) + if head and not os.path.isdir(head): + os.makedirs(head) + fp = open(public_path, 'w') + fp.write(json.dumps(jwks)) + fp.close() + elif public_path: + if os.path.isfile(public_path): + _jwks = open(public_path, 'r').read() + _issuer = KeyIssuer() + _issuer.import_jwks(json.loads(_jwks)) + if key_defs: + _kb = _issuer[0] + _diff = key_diff(_kb, key_defs) + if _diff: + if read_only: + logger.error('Not allowed to write to disc!') + else: + update_key_bundle(_kb, _diff) + _issuer.set([_kb]) + jwks = _issuer.export_jwks() + fp = open(public_path, 'w') + fp.write(json.dumps(jwks)) + fp.close() + else: + _issuer = build_keyissuer(key_defs) + if not read_only: + _jwks = _issuer.export_jwks() + head, tail = os.path.split(public_path) + if head and not os.path.isdir(head): + os.makedirs(head) + fp = open(public_path, 'w') + fp.write(json.dumps(_jwks)) + fp.close() + else: + _issuer = build_keyissuer(key_defs) + + if _issuer is None: + raise ValueError('Could not find any keys') + + return _issuer diff --git a/src/cryptojwt/key_jar.py b/src/cryptojwt/key_jar.py index 69962f2..a393869 100755 --- a/src/cryptojwt/key_jar.py +++ b/src/cryptojwt/key_jar.py @@ -1,22 +1,22 @@ import json import logging -import os +from typing import List +from typing import Optional from requests import request from .jwe.jwe import alg2keytype as jwe_alg2keytype from .jws.utils import alg2keytype as jws_alg2keytype from .key_bundle import KeyBundle -from .key_bundle import build_key_bundle -from .key_bundle import key_diff -from .key_bundle import update_key_bundle +from .key_issuer import KeyIssuer +from .key_issuer import build_keyissuer +from .key_issuer import init_key_issuer +from .utils import deprecated_alias +from .utils import importer +from .utils import qualified_name __author__ = 'Roland Hedberg' -KEYLOADERR = "Failed to load %s key from '%s' (%s)" -REMOTE_FAILED = "Remote key update from '{}' failed, HTTP status {}" -MALFORMED = "Remote key update from {} failed, malformed JWKS." - logger = logging.getLogger(__name__) @@ -36,7 +36,8 @@ class KeyJar(object): """ A keyjar contains a number of KeyBundles sorted by owner/issuer """ def __init__(self, ca_certs=None, verify_ssl=True, keybundle_cls=KeyBundle, - remove_after=3600, httpc=None, httpc_params=None): + remove_after=3600, httpc=None, httpc_params=None, storage_conf=None, + abstract_storage_cls=None): """ KeyJar init function @@ -46,10 +47,21 @@ def __init__(self, ca_certs=None, verify_ssl=True, keybundle_cls=KeyBundle, :param remove_after: How long keys marked as inactive will remain in the key Jar. :param httpc: A HTTP client to use. Default is Requests request. :param httpc_params: HTTP request parameters + :param storage_conf: Storage configuration + :param abstract_storage_cls: Storage class. The only demand on a storage class is that it + should behave like a dictionary. :return: Keyjar instance """ + + if storage_conf is None: + self._issuers = {} + else: + if not abstract_storage_cls: + raise ValueError('Missing storage class specification') + self._issuers = abstract_storage_cls(storage_conf) + + self.storage_conf = storage_conf self.spec2key = {} - self.issuer_keys = {} self.ca_certs = ca_certs self.keybundle_cls = keybundle_cls self.remove_after = remove_after @@ -57,213 +69,168 @@ def __init__(self, ca_certs=None, verify_ssl=True, keybundle_cls=KeyBundle, self.httpc_params = httpc_params or {} # Now part of httpc_params # self.verify_ssl = verify_ssl - if not self.httpc_params: # backward compatibility + if not self.httpc_params: # backward compatibility self.httpc_params["verify"] = verify_ssl + def _issuer_ids(self) -> List[str]: + """ + Returns a list of issuer identifiers + + :return: + """ + return list(self._issuers.keys()) + + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def _get_issuer(self, issuer_id: str) -> Optional[KeyIssuer]: + """ + Return the KeyIssuer instance that has name == issuer_id + + :param issuer_id: The issuer identifiers + :return: A KeyIssuer instance or None + """ + + return self._issuers.get(issuer_id) + + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def _add_issuer(self, issuer_id) -> KeyIssuer: + _issuer = KeyIssuer(ca_certs=self.ca_certs, name=issuer_id, + keybundle_cls=self.keybundle_cls, + remove_after=self.remove_after, + httpc=self.httpc, httpc_params=self.httpc_params) + self._issuers[issuer_id] = _issuer + return _issuer + + def items(self): + """ + Get all owner ID's and their keys + + :return: list of 2-tuples (Owner ID., list of KeyBundles) + """ + return self._issuers.items() + def __repr__(self): - issuers = list(self.issuer_keys.keys()) + issuers = self._issuer_ids() return ''.format(issuers) - def add_url(self, issuer, url, **kwargs): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def return_issuer(self, issuer_id): + """ + Return a KeyIssuer instance with name == issuer_id. + If none such was already initiated, create one. + + :param issuer_id: The issuer ID + :return: A KeyIssuer instance + """ + _issuer = self._get_issuer(issuer_id) + if _issuer is None: + return self._add_issuer(issuer_id) + return _issuer + + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def add_url(self, issuer_id: str, url: str, **kwargs) -> KeyBundle: """ Add a set of keys by url. This method will create a :py:class:`oidcmsg.key_bundle.KeyBundle` instance with the url as source specification. If no file format is given it's assumed that what's on the other side is a JWKS. - :param issuer: Who issued the keys + :param issuer_id: Who issued the keys :param url: Where can the key/-s be found :param kwargs: extra parameters for instantiating KeyBundle :return: A :py:class:`oidcmsg.oauth2.keybundle.KeyBundle` instance """ - if not url: - raise KeyError("No url given") - - if "/localhost:" in url or "/localhost/" in url: - _params = self.httpc_params.copy() - _params['verify'] = False - kb = self.keybundle_cls(source=url, httpc=self.httpc, - httpc_params=_params, **kwargs) - else: - kb = self.keybundle_cls(source=url, httpc=self.httpc, - httpc_params=self.httpc_params, **kwargs) - - kb.update() - self.add_kb(issuer, kb) - + issuer = self.return_issuer(issuer_id) + kb = issuer.add_url(url, **kwargs) return kb - def add_symmetric(self, issuer, key, usage=None): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def add_symmetric(self, issuer_id, key, usage=None): """ Add a symmetric key. This is done by wrapping it in a key bundle cloak since KeyJar does not handle keys directly but only through key bundles. - :param issuer: Owner of the key + :param issuer_id: Owner of the key :param key: The key :param usage: What the key can be used for signing/signature verification (sig) and/or encryption/decryption (enc) """ - if issuer not in self.issuer_keys: - self.issuer_keys[issuer] = [] + issuer = self.return_issuer(issuer_id) + issuer.add_symmetric(key, usage=usage) - if usage is None: - self.issuer_keys[issuer].append( - self.keybundle_cls([{"kty": "oct", "key": key}])) - else: - for use in usage: - self.issuer_keys[issuer].append( - self.keybundle_cls([{"kty": "oct", "key": key, "use": use}])) - - def add_kb(self, issuer, kb): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def add_kb(self, issuer_id, kb): """ Add a key bundle and bind it to an identifier - :param issuer: Owner of the keys in the key bundle + :param issuer_id: Owner of the keys in the key bundle :param kb: A :py:class:`oidcmsg.key_bundle.KeyBundle` instance """ - try: - self.issuer_keys[issuer].append(kb) - except KeyError: - self.issuer_keys[issuer] = [kb] + issuer = self.return_issuer(issuer_id) + issuer.add_kb(kb) + self._issuers[issuer_id] = issuer - def __setitem__(self, issuer, val): - """ - Bind one or a list of key bundles to a special identifier. - Will overwrite whatever was there before !! - - :param issuer: The owner of the keys in the key bundle/-s - :param val: A single or a list of KeyBundle instance - """ - if not isinstance(val, list): - val = [val] - - for kb in val: - if not isinstance(kb, KeyBundle): - raise ValueError('{} not an KeyBundle instance'.format(kb)) - - self.issuer_keys[issuer] = val - - def items(self): - """ - Get all owner ID's and their key bundles - - :return: list of 2-tuples (Owner ID., list of KeyBundles) - """ - return self.issuer_keys.items() - - def get(self, key_use, key_type="", owner="", kid=None, **kwargs): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def get(self, key_use, key_type="", issuer_id="", kid=None, **kwargs): """ Get all keys that matches a set of search criteria :param key_use: A key useful for this usage (enc, dec, sig, ver) :param key_type: Type of key (rsa, ec, oct, ..) - :param owner: Who is the owner of the keys, "" == me (default) + :param issuer_id: Who is the owner of the keys, "" == me (default) :param kid: A Key Identifier :return: A possibly empty list of keys """ - if key_use in ["dec", "enc"]: - use = "enc" - else: - use = "sig" - - _kj = None - if owner != "": - try: - _kj = self.issuer_keys[owner] - except KeyError: - if owner.endswith("/"): - try: - _kj = self.issuer_keys[owner[:-1]] - except KeyError: - pass + _issuer = None + if issuer_id != "": + _issuer = self._get_issuer(issuer_id) + if _issuer is None: + if issuer_id.endswith("/"): + _issuer = self._get_issuer(issuer_id[:-1]) else: - try: - _kj = self.issuer_keys[owner + "/"] - except KeyError: - pass + _issuer = self._get_issuer(issuer_id + "/") else: - try: - _kj = self.issuer_keys[owner] - except KeyError: - pass + _issuer = self._get_issuer(issuer_id) - if _kj is None: + if _issuer is None: return [] - lst = [] - for bundle in _kj: - if key_type: - if key_use in ['ver', 'dec']: - _bkeys = bundle.get(key_type, only_active=False) - else: - _bkeys = bundle.get(key_type) - else: - _bkeys = bundle.keys() - for key in _bkeys: - if key.inactive_since and key_use != "sig": - # Skip inactive keys unless for signature verification - continue - if not key.use or use == key.use: - if kid: - if key.kid == kid: - lst.append(key) - break - else: - continue - else: - lst.append(key) - - # if elliptic curve, have to check if I have a key of the right curve - if key_type == "EC" and "alg" in kwargs: - name = "P-{}".format(kwargs["alg"][2:]) # the type - _lst = [] - for key in lst: - if name != key.crv: - continue - _lst.append(key) - lst = _lst - - if use == 'enc' and key_type == 'oct' and owner != '': - # Add my symmetric keys - for kb in self.issuer_keys['']: - for key in kb.get(key_type): - if key.inactive_since: - continue - if not key.use or key.use == use: - lst.append(key) - - return lst - - def get_signing_key(self, key_type="", owner="", kid=None, **kwargs): + return _issuer.get(key_use=key_use, key_type=key_type, kid=kid, **kwargs) + + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def get_signing_key(self, key_type="", issuer_id="", kid=None, **kwargs): """ Shortcut to use for signing keys only. :param key_type: Type of key (rsa, ec, oct, ..) - :param owner: Who is the owner of the keys, "" == me (default) + :param issuer_id: Who is the owner of the keys, "" == me (default) :param kid: A Key Identifier :param kwargs: Extra key word arguments :return: A possibly empty list of keys """ - return self.get("sig", key_type, owner, kid, **kwargs) + return self.get("sig", key_type, issuer_id, kid, **kwargs) - def get_verify_key(self, key_type="", owner="", kid=None, **kwargs): - return self.get("ver", key_type, owner, kid, **kwargs) + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def get_verify_key(self, key_type="", issuer_id="", kid=None, **kwargs): + return self.get("ver", key_type, issuer_id, kid, **kwargs) - def get_encrypt_key(self, key_type="", owner="", kid=None, **kwargs): - return self.get("enc", key_type, owner, kid, **kwargs) + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def get_encrypt_key(self, key_type="", issuer_id="", kid=None, **kwargs): + return self.get("enc", key_type, issuer_id, kid, **kwargs) - def get_decrypt_key(self, key_type="", owner="", kid=None, **kwargs): - return self.get("dec", key_type, owner, kid, **kwargs) + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def get_decrypt_key(self, key_type="", issuer_id="", kid=None, **kwargs): + return self.get("dec", key_type, issuer_id, kid, **kwargs) - def keys_by_alg_and_usage(self, issuer, alg, usage): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def keys_by_alg_and_usage(self, issuer_id, alg, usage): """ Find all keys that can be used for a specific crypto algorithm and usage by key owner. - :param issuer: Key owner + :param issuer_id: Key owner :param alg: a crypto algorithm :param usage: What the key should be used for :return: A possibly empty list of keys @@ -273,40 +240,54 @@ def keys_by_alg_and_usage(self, issuer, alg, usage): else: ktype = jwe_alg2keytype(alg) - return self.get(usage, ktype, issuer) + return self.get(usage, ktype, issuer_id) - def get_issuer_keys(self, issuer): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def get_issuer_keys(self, issuer_id): """ Get all the keys that belong to an entity. - :param issuer: The entity ID + :param issuer_id: The entity ID :return: A possibly empty list of keys """ - res = [] - for kbl in self.issuer_keys[issuer]: - res.extend(kbl.keys()) - return res + _issuer = self._get_issuer(issuer_id) + if _issuer is None: + raise KeyError(issuer_id) + return _issuer.all_keys() - def __contains__(self, item): - if item in self.issuer_keys: - return True - else: + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def __contains__(self, issuer_id): + _iss = self._get_issuer(issuer_id) + if _iss is None: return False + else: + return True - def __getitem__(self, owner=''): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def __getitem__(self, issuer_id=''): """ - Get all the key bundles that belong to an entity. + Get the KeyIssuer with the name == issuer_id - :param owner: The entity ID - :return: A possibly empty list of key bundles + :param issuer_id: The entity ID + :return: A KeyIssuer instance """ - try: - return self.issuer_keys[owner] - except KeyError: - logger.debug( - "Owner '{}' not found, available key owners: {}".format( - owner, list(self.issuer_keys.keys()))) - raise + _issuer = self._get_issuer(issuer_id) + if _issuer is None: + raise KeyError(issuer_id) + return _issuer + + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def __setitem__(self, issuer_id, key_issuer): + """ + Set a KeyIssuer with the name == issuer_id + + :param issuer_id: The entity ID + :param key_issuer: KeyIssuer instance + """ + self._issuers[issuer_id] = key_issuer + + def set(self, issuer_id, issuer): + self[issuer_id] = issuer def owners(self): """ @@ -314,7 +295,7 @@ def owners(self): :return: A list of entity IDs """ - return list(self.issuer_keys.keys()) + return list(self._issuers.keys()) def match_owner(self, url): """ @@ -325,119 +306,136 @@ def match_owner(self, url): :param url: A URL :return: An issue entity ID that exists in the Key jar """ - for owner in self.issuer_keys.keys(): - if owner.startswith(url): - return owner + _iss = [i for i in self._issuers.keys() if i.startswith(url)] + if _iss: + return _iss[0] raise KeyError("No keys for '{}' in this keyjar".format(url)) def __str__(self): _res = {} - for _id, kbs in self.issuer_keys.items(): - _l = [] - for kb in kbs: - _l.extend(json.loads(kb.jwks())["keys"]) - _res[_id] = {"keys": _l} + for _id, _issuer in self._issuers.items(): + _res[_id] = _issuer.key_summary() return json.dumps(_res) - def load_keys(self, issuer, jwks_uri='', jwks=None, replace=False): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def load_keys(self, issuer_id, jwks_uri='', jwks=None, replace=False): """ Fetch keys from another server :param jwks_uri: A URL pointing to a site that will return a JWKS :param jwks: A dictionary representation of a JWKS - :param issuer: The provider URL + :param issuer_id: The provider URL :param replace: If all previously gathered keys from this provider should be replace. :return: Dictionary with usage as key and keys as values """ - logger.debug("Initiating key bundle for issuer: %s" % issuer) + logger.debug("Initiating key bundle for issuer: %s" % issuer_id) - if replace or issuer not in self.issuer_keys: - self.issuer_keys[issuer] = [] + _issuer = self.return_issuer(issuer_id) + if replace: + _issuer.set([]) if jwks_uri: - self.add_url(issuer, jwks_uri) + _issuer.add_url(jwks_uri) elif jwks: # jwks should only be considered if no jwks_uri is present _keys = jwks['keys'] - self.issuer_keys[issuer].append(self.keybundle_cls(_keys)) + _issuer.add_kb(self.keybundle_cls(_keys)) + + self[issuer_id] = _issuer - def find(self, source, issuer): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def find(self, source, issuer_id=None): """ Find a key bundle based on the source of the keys :param source: A source url - :param issuer: The issuer of keys - :return: A :py:class:`oidcmsg.key_bundle.KeyBundle` instance or None - """ - try: - for kb in self.issuer_keys[issuer]: - if kb.source == source: - return kb - except KeyError: - return None + :param issuer_id: The issuer of keys + :return: List of :py:class:`oidcmsg.key_bundle.KeyBundle` instances or None + """ + if issuer_id is None: + res = {} + for _, _issuer in self._issuers.items(): + kbs = _issuer.find(source) + if kbs: + res[_issuer.name] = kbs + else: + _issuer = self._get_issuer(issuer_id) + if _issuer is None: + return None + else: + res = _issuer.find(source) - return None + return res - def export_jwks(self, private=False, issuer="", usage=None): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def export_jwks(self, private=False, issuer_id="", usage=None): """ Produces a dictionary that later can be easily mapped into a JSON string representing a JWKS. :param private: Whether it should be the private keys or the public - :param issuer: The entity ID. + :param issuer_id: The entity ID. :return: A dictionary with one key: 'keys' """ + _issuer = self._get_issuer(issuer_id=issuer_id) + if _issuer is None: + return {"keys": []} + keys = [] - for kb in self.issuer_keys[issuer]: + for kb in _issuer: keys.extend([k.serialize(private) for k in kb.keys() if - k.inactive_since == 0 and (usage is None or (hasattr(k, 'use') and k.use == usage))]) + k.inactive_since == 0 and ( + usage is None or (hasattr(k, 'use') and k.use == usage))]) return {"keys": keys} - def export_jwks_as_json(self, private=False, issuer=""): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def export_jwks_as_json(self, private=False, issuer_id=""): """ Export a JWKS as a JSON document. :param private: Whether it should be the private keys or the public - :param issuer: The entity ID. + :param issuer_id: The entity ID. :return: A JSON representation of a JWKS """ - return json.dumps(self.export_jwks(private, issuer)) + return json.dumps(self.export_jwks(private, issuer_id)) - def import_jwks(self, jwks, issuer): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def import_jwks(self, jwks, issuer_id): """ Imports all the keys that are represented in a JWKS :param jwks: Dictionary representation of a JWKS - :param issuer: Who 'owns' the JWKS + :param issuer_id: Who 'owns' the JWKS """ try: _keys = jwks["keys"] except KeyError: raise ValueError('Not a proper JWKS') - else: - try: - self.issuer_keys[issuer].append( - self.keybundle_cls(_keys, httpc=self.httpc, httpc_params=self.httpc_params)) - except KeyError: - self.issuer_keys[issuer] = [self.keybundle_cls( - _keys, httpc=self.httpc, httpc_params=self.httpc_params)] - def import_jwks_as_json(self, jwks, issuer): + if _keys: + _issuer = self.return_issuer(issuer_id=issuer_id) + _issuer.add(self.keybundle_cls(_keys, httpc=self.httpc, + httpc_params=self.httpc_params)) + self[issuer_id] = _issuer + + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def import_jwks_as_json(self, jwks, issuer_id): """ Imports all the keys that are represented in a JWKS expressed as a JSON object :param jwks: JSON representation of a JWKS - :param issuer: Who 'owns' the JWKS + :param issuer_id: Who 'owns' the JWKS """ - return self.import_jwks(json.loads(jwks), issuer) + return self.import_jwks(json.loads(jwks), issuer_id) - def import_jwks_from_file(self, filename, issuer): + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def import_jwks_from_file(self, filename, issuer_id): with open(filename) as jwks_file: - self.import_jwks_as_json(jwks_file.read(), issuer) + self.import_jwks_as_json(jwks_file.read(), issuer_id) def __eq__(self, other): if not isinstance(other, KeyJar): @@ -449,16 +447,14 @@ def __eq__(self, other): # Keys per issuer must be the same for iss in self.owners(): - sk = self.get_issuer_keys(iss) - ok = other.get_issuer_keys(iss) - if len(sk) != len(ok): - return False - - if not any(k in ok for k in sk): + if self[iss] != other[iss]: return False return True + def __delitem__(self, key): + del self._issuers[key] + def remove_outdated(self, when=0): """ Goes through the complete list of issuers and for each of them removes @@ -471,56 +467,49 @@ def remove_outdated(self, when=0): :param when: To facilitate testing """ - for iss in list(self.owners()): - _kbl = [] - for kb in self.issuer_keys[iss]: - kb.remove_outdated(self.remove_after, when=when) - if len(kb): - _kbl.append(kb) - if _kbl: - self.issuer_keys[iss] = _kbl - else: - del self.issuer_keys[iss] + for _id, _issuer in self._issuers.items(): + _before = len(_issuer) + _issuer.remove_outdated(when) - def _add_key(self, keys, issuer, use, key_type='', kid='', + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def _add_key(self, keys, issuer_id, use, key_type='', kid='', no_kid_issuer=None, allow_missing_kid=False): - if issuer not in self: - logger.error('Issuer "{}" not in keyjar'.format(issuer)) + _issuer = self._get_issuer(issuer_id) + if _issuer is None: + logger.error('Issuer "{}" not in keyjar'.format(issuer_id)) return keys - logger.debug('Key set summary for {}: {}'.format( - issuer, key_summary(self, issuer))) + logger.debug('Key summary for {}: {}'.format(issuer_id, _issuer.key_summary())) if kid: - for _key in self.get(key_use=use, owner=issuer, kid=kid, key_type=key_type): + for _key in _issuer.get(use, kid=kid, key_type=key_type): if _key and _key not in keys: keys.append(_key) return keys else: try: - kl = self.get(key_use=use, owner=issuer, key_type=key_type) + _add_keys = _issuer.get(use, key_type=key_type) except KeyError: pass else: - if len(kl) == 0: + if len(_add_keys) == 0: return keys - elif len(kl) == 1: - if kl[0] not in keys: - keys.append(kl[0]) + elif len(_add_keys) == 1: + if _add_keys[0] not in keys: + keys.append(_add_keys[0]) elif allow_missing_kid: - keys.extend(kl) + keys.extend(_add_keys) elif no_kid_issuer: try: - allowed_kids = no_kid_issuer[issuer] + allowed_kids = no_kid_issuer[issuer_id] except KeyError: return keys else: if allowed_kids: - keys.extend( - [k for k in kl if k.kid in allowed_kids]) + keys.extend([k for k in _add_keys if k.kid in allowed_kids]) else: - keys.extend(kl) + keys.extend(_add_keys) return keys def get_jwt_decrypt_keys(self, jwt, **kwargs): @@ -544,7 +533,7 @@ def get_jwt_decrypt_keys(self, jwt, **kwargs): logger.info('Missing kid') _kid = '' - keys = self.get(key_use='enc', owner='', key_type=_key_type) + keys = self.get(key_use='enc', issuer_id='', key_type=_key_type) try: _aud = kwargs['aud'] @@ -607,10 +596,10 @@ def get_jwt_verify_keys(self, jwt, **kwargs): _kid, nki, allow_missing_kid) if _key_type == 'oct': - keys.extend(self.get(key_use='sig', owner='', + keys.extend(self.get(key_use='sig', issuer_id='', key_type=_key_type)) else: # No issuer, just use all keys I have - keys = self.get(key_use='sig', owner='', key_type=_key_type) + keys = self.get(key_use='sig', issuer_id='', key_type=_key_type) # Only want the appropriate keys. keys = [k for k in keys if k.appropriate_for('verify')] @@ -622,34 +611,48 @@ def copy(self): :return: A :py:class:`oidcmsg.key_jar.KeyJar` instance """ - kj = KeyJar() - for issuer in self.owners(): - kj[issuer] = [kb.copy() for kb in self[issuer]] + if self.storage_conf: + _conf = self.storage_conf.get('KeyJar') + if _conf: + _label = self.storage_conf.get('label') + if _label: + self.storage_conf['KeyJar']['label'] = '{}.copy'.format(_label) + + kj = KeyJar(storage_conf=self.storage_conf) + for _id, _issuer in self._issuers.items(): + _issuer_copy = KeyIssuer() + _issuer_copy.set([kb.copy() for kb in _issuer]) + kj[_id] = _issuer_copy kj.httpc_params = self.httpc_params kj.httpc = self.httpc return kj def __len__(self): - keys = 0 - for iss in list(self.owners()): - for kb in self.issuer_keys[iss]: - if len(kb): - keys += len(kb) - return keys + return len(self._issuers) - def dump(self): + def dump(self, exclude=None): """ Returns the key jar content as dictionary :return: A dictionary """ - info = {} - for iss in list(self.owners()): - info[iss] = [] - for kb in self.issuer_keys[iss]: - info[iss].append(kb.dump()) + info = { + # 'storage_conf': self.storage_conf, + 'spec2key': self.spec2key, + 'ca_certs': self.ca_certs, + 'keybundle_cls': qualified_name(self.keybundle_cls), + 'remove_after': self.remove_after, + 'httpc_params': self.httpc_params} + + _issuers = {} + for _id, _issuer in self._issuers.items(): + if exclude and _issuer.name in exclude: + continue + _issuers[_id] = _issuer.dump() + info['issuers'] = _issuers + return info def load(self, info): @@ -658,15 +661,48 @@ def load(self, info): :param info: A dictionary with the information :return: """ - for iss, kbs in info.items(): - self.issuer_keys[iss] = [KeyBundle().load(val) for val in kbs] + # self.storage_conf = info['storage_conf'] + self.spec2key = info['spec2key'] + self.ca_certs = info['ca_certs'] + self.keybundle_cls = importer(info['keybundle_cls']) + self.remove_after = info['remove_after'] + self.httpc_params = info['httpc_params'] + + for _issuer_id, _issuer_desc in info['issuers'].items(): + self._issuers[_issuer_id] = KeyIssuer().load(_issuer_desc) + return self + + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def key_summary(self, issuer_id): + _issuer = self._get_issuer(issuer_id) + if _issuer is not None: + return _issuer.key_summary() + + raise KeyError('Unknown Issuer ID: "{}"'.format(issuer_id)) + + def update(self): + """ + Go through the whole key jar, key issuer by key issuer and update them one + by one. + """ + ids = self._issuers.keys() + for _id in ids: + _issuer = self[_id] + _issuer.update() + self[_id] = _issuer + + @deprecated_alias(issuer='issuer_id', owner='issuer_id') + def rotate_keys(self, key_conf, kid_template="", issuer_id=''): + _issuer = self[issuer_id] + _issuer.rotate_keys(key_conf=key_conf, kid_template=kid_template) + self[issuer_id] = _issuer return self # ============================================================================= -def build_keyjar(key_conf, kid_template="", keyjar=None, owner=''): +def build_keyjar(key_conf, kid_template="", keyjar=None, issuer_id='', storage_conf=None): """ Builds a :py:class:`oidcmsg.key_jar.KeyJar` instance or adds keys to an existing KeyJar based on a key specification. @@ -704,59 +740,26 @@ def build_keyjar(key_conf, kid_template="", keyjar=None, owner=''): :param kid_template: A template by which to build the key IDs. If no kid_template is given then the built-in function add_kid() will be used. :param keyjar: If an KeyJar instance the new keys are added to this key jar. - :param owner: The default owner of the keys in the key jar. + :param issuer_id: The default owner of the keys in the key jar. + :param storage_conf: Storage configuration :return: A KeyJar instance """ - if keyjar is None: - keyjar = KeyJar() + _issuer = build_keyissuer(key_conf, kid_template, issuer_id=issuer_id) + if _issuer is None: + return None - bundle = build_key_bundle(key_conf, kid_template) + if keyjar is None: + keyjar = KeyJar(storage_conf=storage_conf) - keyjar.add_kb(owner, bundle) + keyjar[issuer_id] = _issuer return keyjar -def update_keyjar(keyjar): - """ - Go through the whole key jar, key bundle by key bundle and update them one - by one. - - :param keyjar: The key jar to update - """ - for iss, kbl in keyjar.items(): - for kb in kbl: - kb.update() - - -def key_summary(keyjar, issuer): - """ - Return a text representation of the keyjar. - - :param keyjar: A :py:class:`oidcmsg.key_jar.KeyJar` instance - :param issuer: Which key owner that we are looking at - :return: A text representation of the keys - """ - try: - kbl = keyjar[issuer] - except KeyError: - return '' - else: - key_list = [] - for kb in kbl: - for key in kb.keys(): - if key.inactive_since: - key_list.append( - '*{}:{}:{}'.format(key.kty, key.use, key.kid)) - else: - key_list.append( - '{}:{}:{}'.format(key.kty, key.use, key.kid)) - return ', '.join(key_list) - - -def init_key_jar(public_path='', private_path='', key_defs='', owner='', - read_only=True): +@deprecated_alias(issuer='issuer_id', owner='issuer_id') +def init_key_jar(public_path='', private_path='', key_defs='', issuer_id='', read_only=True, + storage_conf=None, abstract_storage_cls=None): """ A number of cases here: @@ -788,84 +791,30 @@ def init_key_jar(public_path='', private_path='', key_defs='', owner='', The keys stored in the KeyJar will be stored under the '' identifier. - :param public_path: A file path to a file that contains a JWKS with public - keys - :param private_path: A file path to a file that contains a JWKS with - private keys. - :param key_defs: A definition of what keys should be created if they are - not already available - :param owner: The owner of the keys - :param read_only: This function should not attempt to write anything - to a file system. + :param public_path: A file path to a file that contains a JWKS with public keys + :param private_path: A file path to a file that contains a JWKS with private keys. + :param key_defs: A definition of what keys should be created if they are not already available + :param issuer_id: The owner of the keys + :param read_only: This function should not attempt to write anything to a file system. :return: An instantiated :py:class;`oidcmsg.key_jar.KeyJar` instance """ - if private_path: - if os.path.isfile(private_path): - _jwks = open(private_path, 'r').read() - _kj = KeyJar() - _kj.import_jwks(json.loads(_jwks), owner) - if key_defs: - _kb = _kj.issuer_keys[owner][0] - _diff = key_diff(_kb, key_defs) - if _diff: - update_key_bundle(_kb, _diff) - if read_only: - logger.error('Not allowed to write to disc!') - else: - _kj.issuer_keys[owner] = [_kb] - jwks = _kj.export_jwks(private=True, issuer=owner) - fp = open(private_path, 'w') - fp.write(json.dumps(jwks)) - fp.close() - else: - _kj = build_keyjar(key_defs, owner=owner) - if not read_only: - jwks = _kj.export_jwks(private=True, issuer=owner) - head, tail = os.path.split(private_path) - if head and not os.path.isdir(head): - os.makedirs(head) - fp = open(private_path, 'w') - fp.write(json.dumps(jwks)) - fp.close() - - if public_path and not read_only: - jwks = _kj.export_jwks(issuer=owner) # public part - head, tail = os.path.split(public_path) - if head and not os.path.isdir(head): - os.makedirs(head) - fp = open(public_path, 'w') - fp.write(json.dumps(jwks)) - fp.close() - elif public_path: - if os.path.isfile(public_path): - _jwks = open(public_path, 'r').read() - _kj = KeyJar() - _kj.import_jwks(json.loads(_jwks), owner) - if key_defs: - _kb = _kj.issuer_keys[owner][0] - _diff = key_diff(_kb, key_defs) - if _diff: - if read_only: - logger.error('Not allowed to write to disc!') - else: - update_key_bundle(_kb, _diff) - _kj.issuer_keys[owner] = [_kb] - jwks = _kj.export_jwks(issuer=owner) - fp = open(public_path, 'w') - fp.write(json.dumps(jwks)) - fp.close() - else: - _kj = build_keyjar(key_defs, owner=owner) - if not read_only: - _jwks = _kj.export_jwks(issuer=owner) - head, tail = os.path.split(public_path) - if head and not os.path.isdir(head): - os.makedirs(head) - fp = open(public_path, 'w') - fp.write(json.dumps(_jwks)) - fp.close() - else: - _kj = build_keyjar(key_defs, owner=owner) - - return _kj + _issuer = init_key_issuer(public_path=public_path, private_path=private_path, + key_defs=key_defs, read_only=read_only) + + if _issuer is None: + raise ValueError('Could not find any keys') + + keyjar = KeyJar(storage_conf=storage_conf, abstract_storage_cls=abstract_storage_cls) + keyjar[issuer_id] = _issuer + return keyjar + + +def rotate_keys(key_conf, keyjar, kid_template="", issuer_id=''): + new_keys = build_keyissuer(key_conf, kid_template, issuer_id=issuer_id) + _issuer = keyjar[issuer_id] + _issuer.mark_all_keys_as_inactive() + for kb in new_keys: + _issuer.add_kb(kb) + keyjar[issuer_id] = _issuer + return keyjar diff --git a/src/cryptojwt/serialize/__init__.py b/src/cryptojwt/serialize/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cryptojwt/serialize/item.py b/src/cryptojwt/serialize/item.py new file mode 100644 index 0000000..a087c8d --- /dev/null +++ b/src/cryptojwt/serialize/item.py @@ -0,0 +1,16 @@ +import json + +from cryptojwt import key_issuer + + +class KeyIssuer: + @staticmethod + def serialize(item: key_issuer.KeyIssuer) -> str: + """ Convert from KeyIssuer to JSON """ + return json.dumps(item.dump()) + + def deserialize(self, spec: str) -> key_issuer.KeyIssuer: + """ Convert from JSON to KeyIssuer """ + _dict = json.loads(spec) + issuer = key_issuer.KeyIssuer().load(_dict) + return issuer diff --git a/src/cryptojwt/tools/jwtpeek.py b/src/cryptojwt/tools/jwtpeek.py index df0450f..4eb1b51 100755 --- a/src/cryptojwt/tools/jwtpeek.py +++ b/src/cryptojwt/tools/jwtpeek.py @@ -7,6 +7,8 @@ import os import sys +from cryptojwt.key_issuer import KeyIssuer + from cryptojwt.jwe import jwe from cryptojwt.jwk.hmac import SYMKey from cryptojwt.jwk.jwk import key_from_jwk_dict @@ -114,9 +116,9 @@ def main(): keys.append(_key) if args.jwks: - _k = KeyJar() - _k.import_jwks(open(args.jwks).read(), "") - keys.extend(_k.issuer_keys("")) + _iss = KeyIssuer() + _iss.import_jwks(open(args.jwks).read()) + keys.extend(_iss.all_keys()) if args.jwks_url: _kb = KeyBundle(source=args.jwks_url) diff --git a/src/cryptojwt/utils.py b/src/cryptojwt/utils.py index 023c909..563933b 100644 --- a/src/cryptojwt/utils.py +++ b/src/cryptojwt/utils.py @@ -1,12 +1,16 @@ import base64 +import functools +import importlib import json import re import struct +import warnings from binascii import unhexlify from typing import List from cryptojwt.exception import BadSyntax + # --------------------------------------------------------------------------- # Helper functions @@ -201,3 +205,57 @@ def deser(val): _val = val return base64_to_long(_val) + + +def modsplit(name): + """Split importable""" + if ':' in name: + _part = name.split(':') + if len(_part) != 2: + raise ValueError("Syntax error: {s}") + return _part[0], _part[1] + + _part = name.split('.') + if len(_part) < 2: + raise ValueError("Syntax error: {s}") + + return '.'.join(_part[:-1]), _part[-1] + + +def importer(name): + """Import by name""" + _part = modsplit(name) + module = importlib.import_module(_part[0]) + return getattr(module, _part[1]) + + +def qualified_name(cls): + return cls.__module__ + "." + cls.__name__ + + +# This is borrowed from +# https://stackoverflow.com/questions/49802412/how-to-implement-deprecation-in-python-with +# -argument-alias +# cudos to https://stackoverflow.com/users/2357112/user2357112-supports-monica + +def deprecated_alias(**aliases): + def deco(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + rename_kwargs(f.__name__, kwargs, aliases) + return f(*args, **kwargs) + + return wrapper + + return deco + + +def rename_kwargs(func_name, kwargs, aliases): + for alias, new in aliases.items(): + if alias in kwargs: + if new in kwargs: + raise TypeError('{} received both {} and {}'.format( + func_name, alias, new)) + warnings.warn('{} is deprecated; use {}'.format(alias, new), + DeprecationWarning) + kwargs[new] = kwargs.pop(alias) diff --git a/tests/cert.der b/tests/cert.der new file mode 100644 index 0000000..a0663c6 Binary files /dev/null and b/tests/cert.der differ diff --git a/tests/test_02_jwk.py b/tests/test_02_jwk.py index e248346..ea1e6c2 100644 --- a/tests/test_02_jwk.py +++ b/tests/test_02_jwk.py @@ -12,14 +12,17 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric.ec import generate_private_key + from cryptojwt.exception import DeSerializationNotPossible from cryptojwt.exception import UnsupportedAlgorithm from cryptojwt.exception import WrongUsage from cryptojwt.jwk import JWK +from cryptojwt.jwk import calculate_x5t +from cryptojwt.jwk import certificate_fingerprint from cryptojwt.jwk import pem_hash from cryptojwt.jwk import pems_to_x5c -from cryptojwt.jwk.ec import NIST2SEC from cryptojwt.jwk.ec import ECKey +from cryptojwt.jwk.ec import NIST2SEC from cryptojwt.jwk.hmac import SYMKey from cryptojwt.jwk.hmac import new_sym_key from cryptojwt.jwk.hmac import sha256_digest @@ -625,7 +628,7 @@ def test_dump_load(): def test_key_ops(): sk = SYMKey( key='df34db91c16613deba460752522d28f6ebc8a73d0d9185836270c26b', - alg = "HS256", + alg="HS256", key_ops=["sign", "verify"] ) @@ -637,9 +640,9 @@ def test_key_ops_and_use(): with pytest.raises(ValueError): SYMKey( key='df34db91c16613deba460752522d28f6ebc8a73d0d9185836270c26b', - alg = "HS256", + alg="HS256", key_ops=["sign", "verify"], - use = "sig" + use="sig" ) @@ -649,9 +652,45 @@ def test_pem_to_x5c(): x5c = pems_to_x5c([cert_chain]) assert len(x5c) == 1 - assert x5c[0] == 'MIIB2jCCAUOgAwIBAgIBATANBgkqhkiG9w0BAQUFADA0MRgwFgYDVQQDEw9UaGUgY29kZSB0ZXN0ZXIxGDAWBgNVBAoTD1VtZWEgVW5pdmVyc2l0eTAeFw0xMjEwMDQwMDIzMDNaFw0xMzEwMDQwMDIzMDNaMDIxCzAJBgNVBAYTAlNFMSMwIQYDVQQDExpPcGVuSUQgQ29ubmVjdCBUZXN0IFNlcnZlcjCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwf+wiusGhA+gleZYQAOPQlNUIucPiqXdPVyieDqQbXXOPBe3nuggtVzeq7pVFH1dZz4dY2Q2LA5DaegvP8kRvoSB/87ds3dy3Rfym/GUSc5B0l1TgEobcyaep8jguRoHto6GWHfCfKqoUYZq4N8vh4LLMQwLR6zi6Jtu82nB5k8CAwEAATANBgkqhkiG9w0BAQUFAAOBgQCsTntG4dfW5kO/Qle6uBhIhZU+3IreIPmbwzpXoCbcgjRa01z6WiBLwDC1RLAL7ucaF/EVlUq4e0cNXKt4ESGNc1xHISOMLetwvS1SN5tKWA9HNua/SaqRtiShxLUjPjmrtpUgotLNDRvUYnTdTT1vhZar7TSPr1yObirjvz/qLw==' + assert x5c[ + 0] == \ + 'MIIB2jCCAUOgAwIBAgIBATANBgkqhkiG9w0BAQUFADA0MRgwFgYDVQQDEw9UaGUgY29kZSB0ZXN0ZXIxGDAWBgNVBAoTD1VtZWEgVW5pdmVyc2l0eTAeFw0xMjEwMDQwMDIzMDNaFw0xMzEwMDQwMDIzMDNaMDIxCzAJBgNVBAYTAlNFMSMwIQYDVQQDExpPcGVuSUQgQ29ubmVjdCBUZXN0IFNlcnZlcjCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwf+wiusGhA+gleZYQAOPQlNUIucPiqXdPVyieDqQbXXOPBe3nuggtVzeq7pVFH1dZz4dY2Q2LA5DaegvP8kRvoSB/87ds3dy3Rfym/GUSc5B0l1TgEobcyaep8jguRoHto6GWHfCfKqoUYZq4N8vh4LLMQwLR6zi6Jtu82nB5k8CAwEAATANBgkqhkiG9w0BAQUFAAOBgQCsTntG4dfW5kO/Qle6uBhIhZU+3IreIPmbwzpXoCbcgjRa01z6WiBLwDC1RLAL7ucaF/EVlUq4e0cNXKt4ESGNc1xHISOMLetwvS1SN5tKWA9HNua/SaqRtiShxLUjPjmrtpUgotLNDRvUYnTdTT1vhZar7TSPr1yObirjvz/qLw==' def test_pem_hash(): _hash = pem_hash(full_path("cert.pem")) - assert _hash \ No newline at end of file + assert _hash + + +def test_certificate_fingerprint(): + with open(full_path('cert.der'), 'rb') as cert_file: + der = cert_file.read() + + res = certificate_fingerprint(der) + assert res == '01:DF:F1:D4:5F:21:7B:2E:3A:A2:D8:CA:13:4C:41:66:03:A1:EF:3E:7B:5E:8B:69:04:5E' \ + ':80:8B:55:49:F1:48' + + res = certificate_fingerprint(der, 'sha1') + assert res == 'CA:CF:21:9E:72:00:CD:1C:CA:FD:4F:6D:84:6B:9E:E8:74:80:47:64' + + res = certificate_fingerprint(der, 'md5') + assert res == '1B:2B:3B:F8:49:EE:2A:2C:C1:C7:6C:88:86:AB:C6:EE' + + with pytest.raises(UnsupportedAlgorithm): + certificate_fingerprint(der, 'foo') + + +# def test_generate_and_store_rsa_key(): +# priv_key = generate_and_store_rsa_key(filename=full_path('temp_rsa.key')) + + +def test_x5t_calculation(): + with open(full_path('cert.der'), 'rb') as cert_file: + der = cert_file.read() + + x5t = calculate_x5t(der) + assert x5t == b'Q0FDRjIxOUU3MjAwQ0QxQ0NBRkQ0RjZEODQ2QjlFRTg3NDgwNDc2NA==' + + x5t_s256 = calculate_x5t(der, 'sha256') + assert x5t_s256 == \ + b'MDFERkYxRDQ1RjIxN0IyRTNBQTJEOENBMTM0QzQxNjYwM0ExRUYzRTdCNUU4QjY5MDQ1RTgwOEI1NTQ5RjE0OA==' diff --git a/tests/test_03_key_bundle.py b/tests/test_03_key_bundle.py index afe9f18..d61cb6c 100755 --- a/tests/test_03_key_bundle.py +++ b/tests/test_03_key_bundle.py @@ -542,6 +542,7 @@ def test_httpc_params_1(): httpc_params=httpc_params) assert kb.do_remote() + @pytest.mark.network def test_httpc_params_2(): httpc_params = {'timeout': 0} @@ -989,6 +990,7 @@ def test_remote(): assert kb2.imp_jwks assert kb2.last_updated + def test_remote_not_modified(): source = 'https://example.com/keys.json' headers = { diff --git a/tests/test_04_key_issuer.py b/tests/test_04_key_issuer.py new file mode 100755 index 0000000..62bff7b --- /dev/null +++ b/tests/test_04_key_issuer.py @@ -0,0 +1,683 @@ +import os +import shutil +import time + +import pytest + +from cryptojwt.exception import JWKESTException +from cryptojwt.key_bundle import KeyBundle +from cryptojwt.key_bundle import keybundle_from_local_file +from cryptojwt.key_issuer import KeyIssuer +from cryptojwt.key_issuer import build_keyissuer +from cryptojwt.key_issuer import init_key_issuer + +__author__ = 'Roland Hedberg' + +BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), + "test_keys")) +RSAKEY = os.path.join(BASE_PATH, "cert.key") +RSA0 = os.path.join(BASE_PATH, "rsa.key") +EC0 = os.path.join(BASE_PATH, "ec.key") +BASEDIR = os.path.abspath(os.path.dirname(__file__)) + + +def full_path(local_file): + return os.path.join(BASEDIR, local_file) + + +JWK0 = { + "keys": [ + { + 'kty': 'RSA', 'e': 'AQAB', 'kid': "abc", + 'n': + 'wf-wiusGhA-gleZYQAOPQlNUIucPiqXdPVyieDqQbXXOPBe3nuggtVzeq7pVFH1dZz4dY2Q2LA5DaegvP8kRvoSB_87ds3dy3Rfym_GUSc5' + 'B0l1TgEobcyaep8jguRoHto6GWHfCfKqoUYZq4N8vh4LLMQwLR6zi6Jtu82nB5k8' + } + ] +} + +JWK1 = { + "keys": [ + { + "n": + "zkpUgEgXICI54blf6iWiD2RbMDCOO1jV0VSff1MFFnujM4othfMsad7H1kRo50YM5S_X9TdvrpdOfpz5aBaKFhT6Ziv0nhtcekq1eRl8" + "mjBlvGKCE5XGk-0LFSDwvqgkJoFYInq7bu0a4JEzKs5AyJY75YlGh879k1Uu2Sv3ZZOunfV1O1Orta" + "-NvS-aG_jN5cstVbCGWE20H0vF" + "VrJKNx0Zf-u-aA-syM4uX7wdWgQ" + "-owoEMHge0GmGgzso2lwOYf_4znanLwEuO3p5aabEaFoKNR4K6GjQcjBcYmDEE4CtfRU9AEmhcD1k" + "leiTB9TjPWkgDmT9MXsGxBHf3AKT5w", + "e": "AQAB", "kty": "RSA", "kid": "rsa1" + }, + { + "k": + "YTEyZjBlMDgxMGI4YWU4Y2JjZDFiYTFlZTBjYzljNDU3YWM0ZWNiNzhmNmFlYTNkNTY0NzMzYjE", + "kty": "oct" + }, + ] +} + +JWK2 = { + "keys": [ + { + "e": "AQAB", + "issuer": "https://login.microsoftonline.com/{tenantid}/v2.0/", + "kid": "kriMPdmBvx68skT8-mPAB3BseeA", + "kty": "RSA", + "n": + "kSCWg6q9iYxvJE2NIhSyOiKvqoWCO2GFipgH0sTSAs5FalHQosk9ZNTztX0ywS_AHsBeQPqYygfYVJL6_EgzVuwRk5txr9e3n1um" + "l94fLyq_AXbwo9yAduf4dCHTP8CWR1dnDR" + "-Qnz_4PYlWVEuuHHONOw_blbfdMjhY" + "-C_BYM2E3pRxbohBb3x__CfueV7ddz2LYiH3" + "wjz0QS_7kjPiNCsXcNyKQEOTkbHFi3mu0u13SQwNddhcynd_GTgWN8A" + "-6SN1r4hzpjFKFLbZnBt77ACSiYx-IHK4Mp-NaVEi5wQt" + "SsjQtI--XsokxRDqYLwus1I1SihgbV_STTg5enufuw", + "use": "sig", + "x5c": [ + "MIIDPjCCAiqgAwIBAgIQsRiM0jheFZhKk49YD0SK1TAJBgUrDgMCHQUAMC0xKzApBgNVBAMTImFjY291bnRzLmFjY2Vzc2NvbnRyb" + "2wud2luZG93cy5uZXQwHhcNMTQwMTAxMDcwMDAwWhcNMTYwMTAxMDcwMDAwWjAtMSswKQYDVQQDEyJhY2NvdW50cy5hY2Nlc3Njb2" + "50cm9sLndpbmRvd3MubmV0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAkSCWg6q9iYxvJE2NIhSyOiKvqoWCO2GFipg" + "H0sTSAs5FalHQosk9ZNTztX0ywS/AHsBeQPqYygfYVJL6" + "/EgzVuwRk5txr9e3n1uml94fLyq/AXbwo9yAduf4dCHTP8CWR1dnDR+Q" + "nz/4PYlWVEuuHHONOw/blbfdMjhY+C/BYM2E3pRxbohBb3x" + "//CfueV7ddz2LYiH3wjz0QS/7kjPiNCsXcNyKQEOTkbHFi3mu0u13S" + "QwNddhcynd/GTgWN8A+6SN1r4hzpjFKFLbZnBt77ACSiYx+IHK4Mp" + "+NaVEi5wQtSsjQtI++XsokxRDqYLwus1I1SihgbV/STTg5en" + "ufuwIDAQABo2IwYDBeBgNVHQEEVzBVgBDLebM6bK3BjWGqIBrBNFeNoS8wLTErMCkGA1UEAxMiYWNjb3VudHMuYWNjZXNzY29udHJ" + "vbC53aW5kb3dzLm5ldIIQsRiM0jheFZhKk49YD0SK1TAJBgUrDgMCHQUAA4IBAQCJ4JApryF77EKC4zF5bUaBLQHQ1PNtA1uMDbdN" + "VGKCmSf8M65b8h0NwlIjGGGy" + "/unK8P6jWFdm5IlZ0YPTOgzcRZguXDPj7ajyvlVEQ2K2ICvTYiRQqrOhEhZMSSZsTKXFVwNfW6ADD" + "kN3bvVOVbtpty+nBY5UqnI7xbcoHLZ4wYD251uj5" + "+lo13YLnsVrmQ16NCBYq2nQFNPuNJw6t3XUbwBHXpF46aLT1/eGf/7Xx6iy8y" + "PJX4DyrpFTutDz882RWofGEO5t4Cw+zZg70dJ/hH/ODYRMorfXEW" + "+8uKmXMKmX2wyxMKvfiPbTy5LmAU8Jvjs2tLg4rOBcXWLAIarZ" + ], + "x5t": "kriMPdmBvx68skT8-mPAB3BseeA" + }, + { + "e": "AQAB", + "issuer": "https://login.microsoftonline.com/{tenantid}/v2.0/", + "kid": "MnC_VZcATfM5pOYiJHMba9goEKY", + "kty": "RSA", + "n": + "vIqz-4-ER_vNWLON9yv8hIYV737JQ6rCl6XfzOC628seYUPf0TaGk91CFxefhzh23V9Tkq" + "-RtwN1Vs_z57hO82kkzL-cQHZX3bMJ" + "D-GEGOKXCEXURN7VMyZWMAuzQoW9vFb1k3cR1RW_EW_P" + "-C8bb2dCGXhBYqPfHyimvz2WarXhntPSbM5XyS5v5yCw5T_Vuwqqsio3" + "V8wooWGMpp61y12NhN8bNVDQAkDPNu2DT9DXB1g0CeFINp_KAS_qQ2Kq6TSvRHJqxRR68RezYtje9KAqwqx4jxlmVAQy0T3-T-IA" + "bsk1wRtWDndhO6s1Os-dck5TzyZ_dNOhfXgelixLUQ", + "use": "sig", + "x5c": [ + "MIIC4jCCAcqgAwIBAgIQQNXrmzhLN4VGlUXDYCRT3zANBgkqhkiG9w0BAQsFADAtMSswKQYDVQQDEyJhY2NvdW50cy5hY2Nlc3Njb" + "250cm9sLndpbmRvd3MubmV0MB4XDTE0MTAyODAwMDAwMFoXDTE2MTAyNzAwMDAwMFowLTErMCkGA1UEAxMiYWNjb3VudHMuYWNjZX" + "NzY29udHJvbC53aW5kb3dzLm5ldDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALyKs" + "/uPhEf7zVizjfcr/ISGFe9+yUO" + "qwpel38zgutvLHmFD39E2hpPdQhcXn4c4dt1fU5KvkbcDdVbP8+e4TvNpJMy" + "/nEB2V92zCQ/hhBjilwhF1ETe1TMmVjALs0KFvbxW" + "9ZN3EdUVvxFvz/gvG29nQhl4QWKj3x8opr89lmq14Z7T0mzOV8kub+cgsOU" + "/1bsKqrIqN1fMKKFhjKaetctdjYTfGzVQ0AJAzzbtg" + "0/Q1wdYNAnhSDafygEv6kNiquk0r0RyasUUevEXs2LY3vSgKsKseI8ZZlQEMtE9/k" + "/iAG7JNcEbVg53YTurNTrPnXJOU88mf3TToX" + "14HpYsS1ECAwEAATANBgkqhkiG9w0BAQsFAAOCAQEAfolx45w0i8CdAUjjeAaYdhG9" + "+NDHxop0UvNOqlGqYJexqPLuvX8iyUaYxNG" + "zZxFgGI3GpKfmQP2JQWQ1E5JtY/n8iNLOKRMwqkuxSCKJxZJq4Sl/m" + "/Yv7TS1P5LNgAj8QLCypxsWrTAmq2HSpkeSk4JBtsYxX6uh" + "bGM/K1sEktKybVTHu22/7TmRqWTmOUy9wQvMjJb2IXdMGLG3hVntN" + "/WWcs5w8vbt1i8Kk6o19W2MjZ95JaECKjBDYRlhG1KmSBtrs" + "KsCBQoBzwH/rXfksTO9JoUYLXiW0IppB7DhNH4PJ5hZI91R8rR0H3" + "/bKkLSuDaKLWSqMhozdhXsIIKvJQ==" + ], + "x5t": "MnC_VZcATfM5pOYiJHMba9goEKY" + }, + { + "e": "AQAB", + "issuer": "https://login.microsoftonline.com/9188040d-6c67-4c5b" + "-b112-36a304b66dad/v2.0/", + "kid": "GvnPApfWMdLRi8PDmisFn7bprKg", + "kty": "RSA", + "n": "5ymq_xwmst1nstPr8YFOTyD1J5N4idYmrph7AyAv95RbWXfDRqy8CMRG7sJq" + "-UWOKVOA4MVrd_NdV-ejj1DE5MPSiG" + "-mZK_5iqRCDFvPYqOyRj539xaTlARNY4jeXZ0N6irZYKqSfYACjkkKxbLKcijSu1pJ48thXOTED0oNa6U", + "use": "sig", + "x5c": [ + "MIICWzCCAcSgAwIBAgIJAKVzMH2FfC12MA0GCSqGSIb3DQEBBQUAMCkxJzAlBgNVBAMTHkxpdmUgSUQgU1RTIFNpZ25pbmcgUHVib" + "GljIEtleTAeFw0xMzExMTExODMzMDhaFw0xNjExMTAxODMzMDhaMCkxJzAlBgNVBAMTHkxpdmUgSUQgU1RTIFNpZ25pbmcgUHVibG" + "ljIEtleTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA5ymq" + "/xwmst1nstPr8YFOTyD1J5N4idYmrph7AyAv95RbWXfDRqy8CMR" + "G7sJq+UWOKVOA4MVrd/NdV+ejj1DE5MPSiG+mZK" + "/5iqRCDFvPYqOyRj539xaTlARNY4jeXZ0N6irZYKqSfYACjkkKxbLKcijSu1pJ" + "48thXOTED0oNa6UCAwEAAaOBijCBhzAdBgNVHQ4EFgQURCN" + "+4cb0pvkykJCUmpjyfUfnRMowWQYDVR0jBFIwUIAURCN+4cb0pvkyk" + "JCUmpjyfUfnRMqhLaQrMCkxJzAlBgNVBAMTHkxpdmUgSUQgU1RTIFNpZ25pbmcgUHVibGljIEtleYIJAKVzMH2FfC12MAsGA1UdDw" + "QEAwIBxjANBgkqhkiG9w0BAQUFAAOBgQB8v8G5" + "/vUl8k7xVuTmMTDA878AcBKBrJ/Hp6RShmdqEGVI7SFR7IlBN1//NwD0n" + "+Iqzmn" + "RV2PPZ7iRgMF/Fyvqi96Gd8X53ds/FaiQpZjUUtcO3fk0hDRQPtCYMII5jq" + "+YAYjSybvF84saB7HGtucVRn2nMZc5cAC42QNYIlPM" + "qA==" + ], + "x5t": "GvnPApfWMdLRi8PDmisFn7bprKg" + }, + { + "e": "AQAB", + "issuer": "https://login.microsoftonline.com/9188040d-6c67-4c5b" + "-b112-36a304b66dad/v2.0/", + "kid": "dEtpjbEvbhfgwUI-bdK5xAU_9UQ", + "kty": "RSA", + "n": + "x7HNcD9ZxTFRaAgZ7-gdYLkgQua3zvQseqBJIt8Uq3MimInMZoE9QGQeSML7qZPlowb5BUakdLI70ayM4vN36--0ht8-oCHhl8Yj" + "GFQkU-Iv2yahWHEP-1EK6eOEYu6INQP9Lk0HMk3QViLwshwb" + "-KXVD02jdmX2HNdYJdPyc0c", + "use": "sig", + "x5c": [ + "MIICWzCCAcSgAwIBAgIJAL3MzqqEFMYjMA0GCSqGSIb3DQEBBQUAMCkxJzAlBgNVBAMTHkxpdmUgSUQgU1RTIFNpZ25pbmcgUHVib" + "GljIEtleTAeFw0xMzExMTExOTA1MDJaFw0xOTExMTAxOTA1MDJaMCkxJzAlBgNVBAMTHkxpdmUgSUQgU1RTIFNpZ25pbmcgUHVibG" + "ljIEtleTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAx7HNcD9ZxTFRaAgZ7" + "+gdYLkgQua3zvQseqBJIt8Uq3MimInMZoE9QGQ" + "eSML7qZPlowb5BUakdLI70ayM4vN36++0ht8+oCHhl8YjGFQkU" + "+Iv2yahWHEP+1EK6eOEYu6INQP9Lk0HMk3QViLwshwb+KXVD02j" + "dmX2HNdYJdPyc0cCAwEAAaOBijCBhzAdBgNVHQ4EFgQULR0aj9AtiNMgqIY8ZyXZGsHcJ5gwWQYDVR0jBFIwUIAULR0aj9AtiNMgq" + "IY8ZyXZGsHcJ5ihLaQrMCkxJzAlBgNVBAMTHkxpdmUgSUQgU1RTIFNpZ25pbmcgUHVibGljIEtleYIJAL3MzqqEFMYjMAsGA1UdDw" + "QEAwIBxjANBgkqhkiG9w0BAQUFAAOBgQBshrsF9yls4ArxOKqXdQPDgHrbynZL8m1iinLI4TeSfmTCDevXVBJrQ6SgDkihl3aCj74" + "IEte2MWN78sHvLLTWTAkiQSlGf1Zb0durw+OvlunQ2AKbK79Qv0Q+wwGuK" + "+oymWc3GSdP1wZqk9dhrQxb3FtdU2tMke01QTut6wr7" + "ig==" + ], + "x5t": "dEtpjbEvbhfgwUI-bdK5xAU_9UQ" + } + ] +} + + +def test_build_keyissuer(): + keys = [ + {"type": "RSA", "use": ["enc", "sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, + ] + + key_issuer = build_keyissuer(keys) + jwks = key_issuer.export_jwks() + for key in jwks["keys"]: + assert "d" not in key # the JWKS shouldn't contain the private part + # of the keys + + assert len(key_issuer) == 3 # 3 keys + assert len(key_issuer.get('sig')) == 2 # 2 for signing + assert len(key_issuer.get('enc')) == 1 # 1 for encryption + + +def test_build_keyissuer_usage(): + keys = [ + {"type": "RSA", "use": ["enc", "sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, + {"type": "oct", "use": ["enc"]}, + {"type": "oct", "use": ["enc"]}, + ] + + key_issuer = build_keyissuer(keys) + jwks_sig = key_issuer.export_jwks(usage='sig') + jwks_enc = key_issuer.export_jwks(usage='enc') + assert len(jwks_sig.get('keys')) == 2 # A total of 2 keys with use=sig + assert len(jwks_enc.get('keys')) == 3 # A total of 3 keys with use=enc + + +def test_build_keyissuer_missing(tmpdir): + keys = [ + { + "type": "RSA", "key": os.path.join(tmpdir.dirname, "missing_file"), + "use": ["enc", "sig"] + }] + + key_issuer = build_keyissuer(keys) + + assert key_issuer is None + + +def test_build_RSA_keyjar_from_file(tmpdir): + keys = [ + { + "type": "RSA", "key": RSA0, + "use": ["enc", "sig"] + }] + + key_issuer = build_keyissuer(keys) + + assert len(key_issuer) == 2 + + +def test_build_EC_keyjar_missing(tmpdir): + keys = [ + { + "type": "EC", "key": os.path.join(tmpdir.dirname, "missing_file"), + "use": ["enc", "sig"] + }] + + key_issuer = build_keyissuer(keys) + + assert key_issuer is None + + +def test_build_EC_keyjar_from_file(tmpdir): + keys = [ + { + "type": "EC", "key": EC0, + "use": ["enc", "sig"] + }] + + key_issuer = build_keyissuer(keys) + + assert len(key_issuer) == 2 + + +class TestKeyJar(object): + def test_keyissuer_add(self): + issuer = KeyIssuer() + kb = keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]) + issuer.add_kb(kb) + assert len(issuer.all_keys()) == 1 + + def test_add_symmetric(self): + issuer = KeyIssuer() + issuer.add_symmetric('abcdefghijklmnop', ['sig']) + assert len(issuer.get('sig', 'oct')) == 1 + + def test_items(self): + issuer = KeyIssuer() + issuer.add_kb(KeyBundle( + [{"kty": "oct", "key": "abcdefghijklmnop", "use": "sig"}, + {"kty": "oct", "key": "ABCDEFGHIJKLMNOP", "use": "enc"}])) + issuer.add_kb(KeyBundle([ + {"kty": "oct", "key": "0123456789012345", "use": "sig"}, + {"kty": "oct", "key": "1234567890123456", "use": "enc"}])) + issuer.add_kb(keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) + + assert len(issuer.all_keys()) == 5 + + def test_get_enc(self): + issuer = KeyIssuer() + issuer.add_kb(KeyBundle( + [{"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "sig"}, + {"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "enc"}])) + issuer.add_kb(KeyBundle([ + {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "sig"}, + {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "enc"}])) + issuer.add_kb(keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) + + assert issuer.get('enc', 'oct') + + def test_get_enc_not_mine(self): + issuer = KeyIssuer() + issuer.add_kb(KeyBundle( + [{"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "sig"}, + {"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "enc"}])) + issuer.add_kb(KeyBundle([ + {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "sig"}, + {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "ver"}])) + issuer.add_kb(keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) + + assert issuer.get('enc', 'oct') + + def test_dump_issuer_keys(self): + kb = keybundle_from_local_file("file://%s/jwk.json" % BASE_PATH, "jwks", + ["sig"]) + assert len(kb) == 1 + issuer = KeyIssuer() + issuer.add_kb(kb) + _jwks_dict = issuer.export_jwks() + + _info = _jwks_dict['keys'][0] + assert _info == { + 'use': 'sig', + 'e': 'AQAB', + 'kty': 'RSA', + 'alg': 'RS256', + 'n': 'pKybs0WaHU_y4cHxWbm8Wzj66HtcyFn7Fh3n' + '-99qTXu5yNa30MRYIYfSDwe9JVc1JUoGw41yq2StdGBJ40HxichjE' + '-Yopfu3B58Q' + 'lgJvToUbWD4gmTDGgMGxQxtv1En2yedaynQ73sDpIK-12JJDY55pvf' + '-PCiSQ9OjxZLiVGKlClDus44_uv2370b9IN2JiEOF-a7JB' + 'qaTEYLPpXaoKWDSnJNonr79tL0T7iuJmO1l705oO3Y0TQ' + '-INLY6jnKG_RpsvyvGNnwP9pMvcP1phKsWZ10ofuuhJGRp8IxQL9Rfz' + 'T87OvF0RBSO1U73h09YP-corWDsnKIi6TbzRpN5YDw', + 'kid': 'abc' + } + + def test_no_use(self): + kb = KeyBundle(JWK0["keys"]) + issuer = KeyIssuer() + issuer.add_kb(kb) + enc_key = issuer.get('enc', "RSA") + assert enc_key != [] + + # @pytest.mark.network + # def test_provider(self): + # issuer = KeyIssuer() + # issuer.load_keys(jwks_uri="https://connect-op.herokuapp.com/jwks.json") + # + # assert issuer.get("https://connect-op.heroku.com")[0].keys() + + +def test_import_jwks(): + issuer = KeyIssuer() + issuer.import_jwks(JWK1) + assert len(issuer.all_keys()) == 2 + + +def test_get_signing_key_use_undefined(): + issuer = KeyIssuer() + issuer.import_jwks(JWK1) + keys = issuer.get('sig', kid='rsa1') + assert len(keys) == 1 + + keys = issuer.get('sig', key_type='rsa') + assert len(keys) == 1 + + keys = issuer.get('sig', key_type='rsa', kid='rsa1') + assert len(keys) == 1 + + +KEYDEFS = [ + {"type": "RSA", "key": '', "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]} +] + + +def test_remove_after(): + # initial key_issuer + key_issuer = build_keyissuer(KEYDEFS) + _old = [k.kid for k in key_issuer.all_keys() if k.kid] + assert len(_old) == 2 + + key_issuer.remove_after = 1 + # rotate_keys = create new keys + make the old as inactive + key_issuer = key_issuer.rotate_keys(KEYDEFS) + + key_issuer.remove_outdated(time.time() + 3600) + + _interm = [k.kid for k in key_issuer.all_keys() if k.kid] + assert len(_interm) == 2 + + # The remainder are the new keys + _new = [k.kid for k in key_issuer.all_keys() if k.kid] + assert len(_new) == 2 + + # should not be any overlap between old and new + assert set(_new).intersection(set(_old)) == set() + + +JWK_UK = { + "keys": [ + { + "n": + "zkpUgEgXICI54blf6iWiD2RbMDCOO1jV0VSff1MFFnujM4othfMsad7H1kRo50YM5S_X9TdvrpdOfpz5aBaKFhT6Ziv0nhtcekq1eRl8" + "mjBlvGKCE5XGk-0LFSDwvqgkJoFYInq7bu0a4JEzKs5AyJY75YlGh879k1Uu2Sv3ZZOunfV1O1Orta" + "-NvS-aG_jN5cstVbCGWE20H0vF" + "VrJKNx0Zf-u-aA-syM4uX7wdWgQ" + "-owoEMHge0GmGgzso2lwOYf_4znanLwEuO3p5aabEaFoKNR4K6GjQcjBcYmDEE4CtfRU9AEmhcD1k" + "leiTB9TjPWkgDmT9MXsGxBHf3AKT5w", + "e": "AQAB", "kty": "RSA", "kid": "rsa1" + }, + { + "k": + "YTEyZjBlMDgxMGI4YWU4Y2JjZDFiYTFlZTBjYzljNDU3YWM0ZWNiNzhmNmFlYTNkNTY0NzMzYjE", + "kty": "buz" + }, + ] +} + + +def test_load_unknown_keytype(): + issuer = KeyIssuer() + issuer.import_jwks(JWK_UK) + assert len(issuer.all_keys()) == 1 + + +JWK_FP = { + "keys": [ + {"e": "AQAB", "kty": "RSA", "kid": "rsa1"}, + ] +} + + +def test_load_missing_key_parameter(): + issuer = KeyIssuer() + with pytest.raises(JWKESTException): + issuer.import_jwks(JWK_FP) + + +JWKS_SPO = { + "keys": [ + { + "kid": + "BfxfnahEtkRBG3Hojc9XGLGht_5rDBj49Wh3sBDVnzRpulMqYwMRmpizA0aSPT1fhCHYivTiaucWUqFu_GwTqA", + "use": "sig", + "alg": "ES256", + "kty": "EC", + "crv": "P-256", + "x": "1XXUXq75gOPZ4bEj1o2Z5XKJWSs6LmL6fAOK3vyMzSc", + "y": "ac1h_DwyuUxhkrD9oKMJ-b_KuiVvvSARIwT-XoEmDXs" + }, + { + "kid": + "91pD1H81rXUvrfg9mkngIG-tXjnldykKUVbITDIU1SgJvq91b8clOcJuEHNAq61eIvg8owpEvWcWAtlbV2awyA", + "use": "sig", + "alg": "ES256", + "kty": "EC", + "crv": "P-256", + "x": "2DfQoLpZS2j3hHEcHDkzV8ISx-RdLt6Opy8YZYVm4AQ", + "y": "ycvkFMBIzgsowiaf6500YlG4vaMSK4OF7WVtQpUbEE0" + }, + { + "kid": "0sIEl3MUJiCxrqleEBBF-_bZq5uClE84xp-wpt8oOI" + "-WIeNxBjSR4ak_OTOmLdndB0EfDLtC7X1JrnfZILJkxA", + "use": "sig", + "alg": "RS256", + "kty": "RSA", + "n": + "yG9914Q1j63Os4jX5dBQbUfImGq4zsXJD4R59XNjGJlEt5ek6NoiDl0ucJO3_7_R9e5my2ONTSqZhtzFW6MImnIn8idWYzJzO2EhUPCHTvw_2oOGjeYTE2VltIyY_ogIxGwY66G0fVPRRH9tCxnkGOrIvmVgkhCCGkamqeXuWvx9MCHL_gJbZJVwogPSRN_SjA1gDlvsyCdA6__CkgAFcSt1sGgiZ_4cQheKexxf1-7l8R91ZYetz53drk2FS3SfuMZuwMM4KbXt6CifNhzh1Ye-5Tr_ZENXdAvuBRDzfy168xnk9m0JBtvul9GoVIqvCVECB4MPUb7zU6FTIcwRAw", + "e": "AQAB" + }, + { + "kid": + "zyDfdEU7pvH0xEROK156ik8G7vLO1MIL9TKyL631kSPtr9tnvs9XOIiq5jafK2hrGr2qqvJdejmoonlGqWWZRA", + "use": "sig", + "alg": "RS256", + "kty": "RSA", + "n": + "68be-nJp46VLj4Ci1V36IrVGYqkuBfYNyjQTZD_7yRYcERZebowOnwr3w0DoIQpl8iL2X8OXUo7rUW_LMzLxKx2hEmdJfUn4LL2QqA3KPgjYz8hZJQPG92O14w9IZ-8bdDUgXrg9216H09yq6ZvJrn5Nwvap3MXgECEzsZ6zQLRKdb_R96KFFgCiI3bEiZKvZJRA7hM2ePyTm15D9En_Wzzfn_JLMYgE_DlVpoKR1MsTinfACOlwwdO9U5Dm-5elapovILTyVTgjN75i-wsPU2TqzdHFKA-4hJNiWGrYPiihlAFbA2eUSXuEYFkX43ahoQNpeaf0mc17Jt5kp7pM2w", + "e": "AQAB" + }, + { + "kid": "q-H9y8iuh3BIKZBbK6S0mH_isBlJsk" + "-u6VtZ5rAdBo5fCjjy3LnkrsoK_QWrlKB08j_PcvwpAMfTEDHw5spepw", + "use": "sig", + "alg": "EdDSA", + "kty": "OKP", + "crv": "Ed25519", + "x": "FnbcUAXZ4ySvrmdXK1MrDuiqlqTXvGdAaE4RWZjmFIQ" + }, + { + "kid": + "bL33HthM3fWaYkY2_pDzUd7a65FV2R2LHAKCOsye8eNmAPDgRgpHWPYpWFVmeaujUUEXRyDLHN" + "-Up4QH_sFcmw", + "use": "sig", + "alg": "EdDSA", + "kty": "OKP", + "crv": "Ed25519", + "x": "CS01DGXDBPV9cFmd8tgFu3E7eHn1UcP7N1UCgd_JgZo" + } + ] +} + + +def test_load_spomky_keys(): + issuer = KeyIssuer() + issuer.import_jwks(JWKS_SPO) + assert len(issuer) == 4 + + +def test_get_ec(): + issuer = KeyIssuer() + issuer.import_jwks(JWKS_SPO) + k = issuer.get('sig', 'EC', alg='ES256') + assert k + + +def test_get_ec_wrong_alg(): + issuer = KeyIssuer() + issuer.import_jwks(JWKS_SPO) + k = issuer.get('sig', 'EC', alg='ES512') + assert k == [] + + +def test_keyissuer_eq(): + kj1 = KeyIssuer() + kj1.import_jwks(JWKS_SPO) + + kj2 = KeyIssuer() + kj2.import_jwks(JWKS_SPO) + + assert kj1 == kj2 + + +PUBLIC_FILE = '{}/public_jwks.json'.format(BASEDIR) +PRIVATE_FILE = '{}/private_jwks.json'.format(BASEDIR) +KEYSPEC = [ + {"type": "RSA", "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]} +] +KEYSPEC_2 = [ + {"type": "RSA", "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, + {"type": "EC", "crv": "P-384", "use": ["sig"]} +] +KEYSPEC_3 = [ + {"type": "RSA", "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, + {"type": "EC", "crv": "P-384", "use": ["sig"]}, + {"type": "EC", "crv": "P-521", "use": ["sig"]} +] +KEYSPEC_4 = [ + {"type": "RSA", "use": ["sig"]}, + {"type": "RSA", "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, + {"type": "EC", "crv": "P-384", "use": ["sig"]} +] +KEYSPEC_5 = [ + {"type": "EC", "crv": "P-256", "use": ["sig"]}, + {"type": "EC", "crv": "P-384", "use": ["sig"]} +] + + +def test_init_key_issuer(): + # Nothing written to file + _keyissuer = init_key_issuer(key_defs=KEYSPEC) + assert len(_keyissuer) == 2 + + +def test_init_key_jar_dump_public(): + for _file in [PRIVATE_FILE, PUBLIC_FILE]: + if os.path.isfile(_file): + os.unlink(_file) + + # JWKS with public keys written to file + _keyissuer = init_key_issuer(public_path=PUBLIC_FILE, key_defs=KEYSPEC) + assert len(_keyissuer) == 2 + + # JWKS will be read from disc, not created new + _keyissuer2 = init_key_issuer(public_path=PUBLIC_FILE, key_defs=KEYSPEC) + assert len(_keyissuer2) == 2 + + # verify that the 2 Key jars contains the same keys + + +def test_init_key_issuer_dump_private(): + for _file in [PRIVATE_FILE, PUBLIC_FILE]: + if os.path.isfile(_file): + os.unlink(_file) + + # New set of keys, JWKSs with keys and public written to file + _keyissuer = init_key_issuer(private_path=PRIVATE_FILE, key_defs=KEYSPEC, read_only=False) + + # JWKS will be read from disc, not created new + _keyissuer2 = init_key_issuer(private_path=PRIVATE_FILE, key_defs=KEYSPEC, read_only=False) + assert _keyissuer == _keyissuer2 + + +def test_init_key_issuer_update(): + for _file in [PRIVATE_FILE, PUBLIC_FILE]: + if os.path.isfile(_file): + os.unlink(_file) + + # New set of keys, JWKSs with keys and public written to file + _keyissuer_1 = init_key_issuer(private_path=PRIVATE_FILE, key_defs=KEYSPEC, + public_path=PUBLIC_FILE, read_only=False) + assert len(_keyissuer_1) == 2 + + _keyissuer_2 = init_key_issuer(private_path=PRIVATE_FILE, key_defs=KEYSPEC_2, + public_path=PUBLIC_FILE) + + # Both should contain the same RSA key + rsa1 = _keyissuer_1.get('sig', 'RSA') + rsa2 = _keyissuer_2.get('sig', 'RSA') + + assert len(rsa1) == 1 + assert len(rsa2) == 1 + assert rsa1[0] == rsa2[0] + + # keyjar1 should only contain one EC key while keyjar2 should contain 2. + + ec1 = _keyissuer_1.get('sig', 'EC') + ec2 = _keyissuer_2.get('sig', 'EC', '') + assert len(ec1) == 1 + assert len(ec2) == 2 + + # The file on disc should not have changed + _keyissuer_3 = init_key_issuer(private_path=PRIVATE_FILE) + + assert len(_keyissuer_3.get('sig', 'RSA')) == 1 + assert len(_keyissuer_3.get('sig', 'EC')) == 1 + + _keyissuer_4 = init_key_issuer(private_path=PRIVATE_FILE, key_defs=KEYSPEC_2, + public_path=PUBLIC_FILE, read_only=False) + + # Now it should + _keyissuer_5 = init_key_issuer(private_path=PRIVATE_FILE) + + assert len(_keyissuer_5.get('sig', 'RSA')) == 1 + assert len(_keyissuer_5.get('sig', 'EC')) == 2 + + +OIDC_KEYS = { + 'private_path': "{}/priv/jwks.json".format(BASEDIR), + 'key_defs': KEYSPEC, + 'public_path': '{}/public/jwks.json'.format(BASEDIR) +} + + +def test_init_key_issuer_create_directories(): + # make sure the directories are gone + for _dir in ['priv', 'public']: + if os.path.isdir("{}/{}".format(BASEDIR, _dir)): + shutil.rmtree("{}/{}".format(BASEDIR, _dir)) + + _keyissuer = init_key_issuer(**OIDC_KEYS) + assert len(_keyissuer.get('sig', 'RSA')) == 1 + assert len(_keyissuer.get('sig', 'EC')) == 1 + + +def test_dump(): + issuer = KeyIssuer() + issuer.add_kb(KeyBundle(JWK2['keys'])) + + res = issuer.dump() + + nkj = KeyIssuer().load(res) + assert nkj.get('sig', 'rsa', kid="kriMPdmBvx68skT8-mPAB3BseeA") + assert nkj.get('sig', 'rsa', kid='MnC_VZcATfM5pOYiJHMba9goEKY') + + +def test_contains(): + issuer = KeyIssuer() + issuer.add_kb(KeyBundle(JWK1['keys'])) + for k in issuer.all_keys(): + assert k in issuer diff --git a/tests/test_04_key_jar.py b/tests/test_04_key_jar.py index 23cd06a..24e253f 100755 --- a/tests/test_04_key_jar.py +++ b/tests/test_04_key_jar.py @@ -15,11 +15,11 @@ from cryptojwt.key_jar import KeyJar from cryptojwt.key_jar import build_keyjar from cryptojwt.key_jar import init_key_jar -from cryptojwt.key_jar import key_summary -from cryptojwt.key_jar import update_keyjar __author__ = 'Roland Hedberg' +from cryptojwt.key_jar import rotate_keys + BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "test_keys")) RSAKEY = os.path.join(BASE_PATH, "cert.key") @@ -207,7 +207,7 @@ def test_build_keyjar(): assert "d" not in key # the JWKS shouldn't contain the private part # of the keys - assert len(keyjar[""]) == 1 # One key bundle + assert len(keyjar[""]) == 3 # 3 keys assert len(keyjar.get_issuer_keys('')) == 3 # A total of 3 keys assert len(keyjar.get('sig')) == 2 # 2 for signing assert len(keyjar.get('enc')) == 1 # 1 for encryption @@ -237,7 +237,7 @@ def test_build_keyjar_missing(tmpdir): key_jar = build_keyjar(keys) - assert len(key_jar[""]) == 1 + assert key_jar is None def test_build_RSA_keyjar_from_file(tmpdir): @@ -249,7 +249,7 @@ def test_build_RSA_keyjar_from_file(tmpdir): key_jar = build_keyjar(keys) - assert len(key_jar[""]) == 1 + assert len(key_jar[""]) == 2 def test_build_EC_keyjar_missing(tmpdir): @@ -261,7 +261,7 @@ def test_build_EC_keyjar_missing(tmpdir): key_jar = build_keyjar(keys) - assert len(key_jar[""]) == 1 + assert key_jar is None def test_build_EC_keyjar_from_file(tmpdir): @@ -273,7 +273,7 @@ def test_build_EC_keyjar_from_file(tmpdir): key_jar = build_keyjar(keys) - assert len(key_jar[""]) == 1 + assert len(key_jar[""]) == 2 class TestKeyJar(object): @@ -286,7 +286,7 @@ def test_keyjar_add(self): def test_setitem(self): kj = KeyJar() kb = keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]) - kj['https://issuer.example.com'] = kb + kj.add_kb('https://issuer.example.com', kb) assert list(kj.owners()) == ['https://issuer.example.com'] def test_add_symmetric(self): @@ -297,66 +297,66 @@ def test_add_symmetric(self): def test_items(self): ks = KeyJar() - ks[""] = KeyBundle( + ks.add_kb("", KeyBundle( [{"kty": "oct", "key": "abcdefghijklmnop", "use": "sig"}, - {"kty": "oct", "key": "ABCDEFGHIJKLMNOP", "use": "enc"}]) - ks["http://www.example.org"] = KeyBundle([ + {"kty": "oct", "key": "ABCDEFGHIJKLMNOP", "use": "enc"}])) + ks.add_kb("http://www.example.org", KeyBundle([ {"kty": "oct", "key": "0123456789012345", "use": "sig"}, - {"kty": "oct", "key": "1234567890123456", "use": "enc"}]) - ks["http://www.example.org"].append( - keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) + {"kty": "oct", "key": "1234567890123456", "use": "enc"}])) + ks.add_kb("http://www.example.org", + keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) assert len(ks.items()) == 2 def test_issuer_extra_slash(self): ks = KeyJar() - ks[""] = KeyBundle( + ks.add_kb("", KeyBundle( [{"kty": "oct", "key": "abcdefghijklmnop", "use": "sig"}, - {"kty": "oct", "key": "ABCDEFGHIJKLMNOP", "use": "enc"}]) - ks["http://www.example.org"] = KeyBundle([ + {"kty": "oct", "key": "ABCDEFGHIJKLMNOP", "use": "enc"}])) + ks.add_kb("http://www.example.org", KeyBundle([ {"kty": "oct", "key": "0123456789012345", "use": "sig"}, - {"kty": "oct", "key": "1234567890123456", "use": "enc"}]) - ks["http://www.example.org"].append( - keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) + {"kty": "oct", "key": "1234567890123456", "use": "enc"}])) + ks.add_kb("http://www.example.org", + keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) assert ks.get('sig', 'RSA', 'http://www.example.org/') def test_issuer_missing_slash(self): ks = KeyJar() - ks[""] = KeyBundle( + ks.add_kb("", KeyBundle( [{"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "sig"}, - {"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "enc"}]) - ks["http://www.example.org/"] = KeyBundle([ + {"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "enc"}])) + ks.add_kb("http://www.example.org/", KeyBundle([ {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "sig"}, - {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "enc"}]) - ks["http://www.example.org/"].append( - keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) + {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "enc"}])) + ks.add_kb("http://www.example.org/", + keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) assert ks.get('sig', 'RSA', 'http://www.example.org') def test_get_enc(self): ks = KeyJar() - ks[""] = KeyBundle( + ks.add_kb("", KeyBundle( [{"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "sig"}, - {"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "enc"}]) - ks["http://www.example.org/"] = KeyBundle([ + {"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "enc"}])) + ks.add_kb("http://www.example.org/", KeyBundle([ {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "sig"}, - {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "enc"}]) - ks["http://www.example.org/"].append( - keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) + {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "enc"}])) + ks.add_kb("http://www.example.org/", + keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) assert ks.get('enc', 'oct') def test_get_enc_not_mine(self): ks = KeyJar() - ks[""] = KeyBundle( + ks.add_kb("", KeyBundle( [{"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "sig"}, - {"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "enc"}]) - ks["http://www.example.org/"] = KeyBundle([ + {"kty": "oct", "key": "a1b2c3d4e5f6g7h8", "use": "enc"}])) + ks.add_kb("http://www.example.org/", KeyBundle([ {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "sig"}, - {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "ver"}]) - ks["http://www.example.org/"].append( - keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) + {"kty": "oct", "key": "1a2b3c4d5e6f7g8h", "use": "enc"}])) + ks.add_kb("http://www.example.org/", + keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"])) assert ks.get('enc', 'oct', 'http://www.example.org/') @@ -365,7 +365,7 @@ def test_dump_issuer_keys(self): ["sig"]) assert len(kb) == 1 kj = KeyJar() - kj.issuer_keys[""] = [kb] + kj.add_kb("", kb) _jwks_dict = kj.export_jwks() _info = _jwks_dict['keys'][0] @@ -388,17 +388,17 @@ def test_dump_issuer_keys(self): def test_no_use(self): kb = KeyBundle(JWK0["keys"]) kj = KeyJar() - kj.issuer_keys["abcdefgh"] = [kb] + kj.add_kb("abcdefgh", kb) enc_key = kj.get_encrypt_key("RSA", "abcdefgh") assert enc_key != [] @pytest.mark.network def test_provider(self): - ks = KeyJar() - ks.load_keys("https://connect-op.heroku.com", + kj = KeyJar() + kj.load_keys("https://connect-op.heroku.com", jwks_uri="https://connect-op.herokuapp.com/jwks.json") - assert ks["https://connect-op.heroku.com"][0].keys() + assert kj.get_issuer_keys("https://connect-op.heroku.com")[0].keys() def test_import_jwks(): @@ -432,24 +432,14 @@ def test_remove_after(): _old = [k.kid for k in keyjar.get_issuer_keys('') if k.kid] assert len(_old) == 2 + keyjar.remove_after = 1 # rotate_keys = create new keys + make the old as inactive - keyjar = build_keyjar(KEYDEFS, keyjar=keyjar) + keyjar = rotate_keys(KEYDEFS, keyjar=keyjar) - keyjar.remove_after = 1 - # None are remove since none are marked as inactive yet - keyjar.remove_outdated() + keyjar.remove_outdated(time.time() + 3600) _interm = [k.kid for k in keyjar.get_issuer_keys('') if k.kid] - assert len(_interm) == 4 - - # Now mark the keys to be inactivated - _now = time.time() - for k in keyjar.get_issuer_keys(''): - if k.kid in _old: - if not k.inactive_since: - k.inactive_since = _now - - keyjar.remove_outdated(_now + 5) + assert len(_interm) == 2 # The remainder are the new keys _new = [k.kid for k in keyjar.get_issuer_keys('') if k.kid] @@ -618,23 +608,24 @@ def setup(self): self.alice_keyjar = build_keyjar(mkey) # Bob has one single keys self.bob_keyjar = build_keyjar(skey) - self.alice_keyjar['Alice'] = self.alice_keyjar[''] - self.bob_keyjar['Bob'] = self.bob_keyjar[''] + self.alice_keyjar.import_jwks(self.alice_keyjar.export_jwks(private=True, issuer_id=''), + 'Alice') + self.bob_keyjar.import_jwks(self.bob_keyjar.export_jwks(private=True, issuer_id=''), 'Bob') # To Alice's keyjar add Bob's public keys self.alice_keyjar.import_jwks( - self.bob_keyjar.export_jwks(issuer='Bob'), 'Bob') + self.bob_keyjar.export_jwks(issuer_id='Bob'), 'Bob') # To Bob's keyjar add Alice's public keys self.bob_keyjar.import_jwks( - self.alice_keyjar.export_jwks(issuer='Alice'), 'Alice') + self.alice_keyjar.export_jwks(issuer_id='Alice'), 'Alice') _jws = JWS('{"aud": "Bob", "iss": "Alice"}', alg='RS256') - sig_key = self.alice_keyjar.get_signing_key('rsa', owner='Alice')[0] + sig_key = self.alice_keyjar.get_signing_key('rsa', issuer_id='Alice')[0] self.sjwt_a = _jws.sign_compact([sig_key]) _jws = JWS('{"aud": "Alice", "iss": "Bob"}', alg='RS256') - sig_key = self.bob_keyjar.get_signing_key('rsa', owner='Bob')[0] + sig_key = self.bob_keyjar.get_signing_key('rsa', issuer_id='Bob')[0] self.sjwt_b = _jws.sign_compact([sig_key]) def test_no_kid_multiple_keys(self): @@ -656,7 +647,7 @@ def test_no_kid_single_key(self): def test_no_kid_multiple_keys_no_kid_issuer(self): a_kids = [k.kid for k in - self.alice_keyjar.get_verify_key(owner='Alice', + self.alice_keyjar.get_verify_key(issuer_id='Alice', key_type='RSA')] no_kid_issuer = {'Alice': a_kids} _jwt = factory(self.sjwt_a) @@ -685,11 +676,11 @@ def test_no_matching_kid(self): assert keys == [] def test_aud(self): - self.alice_keyjar.import_jwks(JWK1, issuer='D') - self.bob_keyjar.import_jwks(JWK1, issuer='D') + self.alice_keyjar.import_jwks(JWK1, issuer_id='D') + self.bob_keyjar.import_jwks(JWK1, issuer_id='D') _jws = JWS('{"iss": "D", "aud": "A"}', alg='HS256') - sig_key = self.alice_keyjar.get_signing_key('oct', owner='D')[0] + sig_key = self.alice_keyjar.get_signing_key('oct', issuer_id='D')[0] _sjwt = _jws.sign_compact([sig_key]) no_kid_issuer = {'D': []} @@ -703,9 +694,9 @@ def test_aud(self): def test_copy(): kj = KeyJar() - kj['Alice'] = [KeyBundle(JWK0['keys'])] - kj['Bob'] = [KeyBundle(JWK1['keys'])] - kj['C'] = [KeyBundle(JWK2['keys'])] + kj.add_kb('Alice', KeyBundle(JWK0['keys'])) + kj.add_kb('Bob', KeyBundle(JWK1['keys'])) + kj.add_kb('C', KeyBundle(JWK2['keys'])) kjc = kj.copy() @@ -723,9 +714,9 @@ def test_copy(): def test_repr(): kj = KeyJar() - kj['Alice'] = [KeyBundle(JWK0['keys'])] - kj['Bob'] = [KeyBundle(JWK1['keys'])] - kj['C'] = [KeyBundle(JWK2['keys'])] + kj.add_kb('Alice', KeyBundle(JWK0['keys'])) + kj.add_kb('Bob', KeyBundle(JWK1['keys'])) + kj.add_kb('C', KeyBundle(JWK2['keys'])) txt = kj.__repr__() assert "