diff --git a/CHANGELOG.md b/CHANGELOG.md index 37aed969a..2ebd09ee2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to ## [Unreleased] +## Added + +- ✨(oidc) add refresh token tools #584 + ## [2.5.0] - 2025-03-18 ## Added diff --git a/env.d/development/common.e2e.dist b/env.d/development/common.e2e.dist index 747b9a9d8..b4de88be1 100644 --- a/env.d/development/common.e2e.dist +++ b/env.d/development/common.e2e.dist @@ -4,3 +4,6 @@ BURST_THROTTLE_RATES="200/minute" DJANGO_SERVER_TO_SERVER_API_TOKENS=test-e2e Y_PROVIDER_API_KEY=yprovider-api-key Y_PROVIDER_API_BASE_URL=http://y-provider:4444/api/ + +# - add a key to store the refresh token in tests +OIDC_STORE_REFRESH_TOKEN_KEY=qnw7gZrOFLkLuZIixzuxksNORFJyjWyi5ACugNchKJY= diff --git a/src/backend/core/authentication/backends.py b/src/backend/core/authentication/backends.py index f8a7486dd..05d3b7285 100644 --- a/src/backend/core/authentication/backends.py +++ b/src/backend/core/authentication/backends.py @@ -1,21 +1,59 @@ """Authentication Backends for the Impress core app.""" import logging +from functools import lru_cache from django.conf import settings from django.core.exceptions import SuspiciousOperation from django.utils.translation import gettext_lazy as _ import requests +from cryptography.fernet import Fernet from mozilla_django_oidc.auth import ( OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend, ) +from mozilla_django_oidc.utils import import_from_settings from core.models import DuplicateEmailError, User logger = logging.getLogger(__name__) +@lru_cache(maxsize=0) +def get_cipher_suite(): + """Return a Fernet cipher suite.""" + key = import_from_settings("OIDC_STORE_REFRESH_TOKEN_KEY", None) + if not key: + raise ValueError("OIDC_STORE_REFRESH_TOKEN_KEY setting is required.") + return Fernet(key) + + +def store_oidc_refresh_token(session, refresh_token): + """Store the encrypted OIDC refresh token in the session if enabled in settings.""" + if import_from_settings("OIDC_STORE_REFRESH_TOKEN", False): + encrypted_token = get_cipher_suite().encrypt(refresh_token.encode()) + session["oidc_refresh_token"] = encrypted_token.decode() + + +def get_oidc_refresh_token(session): + """Retrieve and decrypt the OIDC refresh token from the session.""" + encrypted_token = session.get("oidc_refresh_token") + if encrypted_token: + return get_cipher_suite().decrypt(encrypted_token.encode()).decode() + return None + + +def store_tokens(session, access_token, id_token, refresh_token): + """Store tokens in the session if enabled in settings.""" + if import_from_settings("OIDC_STORE_ACCESS_TOKEN", False): + session["oidc_access_token"] = access_token + + if import_from_settings("OIDC_STORE_ID_TOKEN", False): + session["oidc_id_token"] = id_token + + store_oidc_refresh_token(session, refresh_token) + + class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): """Custom OpenID Connect (OIDC) Authentication Backend. @@ -23,6 +61,40 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): in the User and Identity models, and handles signed and/or encrypted UserInfo response. """ + def __init__(self, *args, **kwargs): + """ + Initialize the OIDC Authentication Backend. + + Adds an internal attribute to store the token_info dictionary. + The purpose of `self._token_info` is to not duplicate code from + the original `authenticate` method. + This won't be needed after https://github.com/mozilla/mozilla-django-oidc/pull/377 + is merged. + """ + super().__init__(*args, **kwargs) + self._token_info = None + + def get_token(self, payload): + """ + Return token object as a dictionary. + + Store the value to extract the refresh token in the `authenticate` method. + """ + self._token_info = super().get_token(payload) + return self._token_info + + def authenticate(self, request, **kwargs): + """Authenticates a user based on the OIDC code flow.""" + user = super().authenticate(request, **kwargs) + + if user is not None: + # Then the user successfully authenticated + store_oidc_refresh_token( + request.session, self._token_info.get("refresh_token") + ) + + return user + def get_userinfo(self, access_token, id_token, payload): """Return user details dictionary. diff --git a/src/backend/core/authentication/decorators.py b/src/backend/core/authentication/decorators.py new file mode 100644 index 000000000..433f7249c --- /dev/null +++ b/src/backend/core/authentication/decorators.py @@ -0,0 +1,12 @@ +""" +Decorators for the authentication app. + +We don't want (yet) to enforce the OIDC access token to be "fresh" for all +views, so we provide a decorator to refresh the access token only when needed. +""" + +from django.utils.decorators import decorator_from_middleware + +from .middleware import RefreshOIDCAccessToken + +refresh_oidc_access_token = decorator_from_middleware(RefreshOIDCAccessToken) diff --git a/src/backend/core/authentication/middleware.py b/src/backend/core/authentication/middleware.py new file mode 100644 index 000000000..878345ff1 --- /dev/null +++ b/src/backend/core/authentication/middleware.py @@ -0,0 +1,199 @@ +""" +Module to declare a RefreshOIDCAccessToken middleware that extends the +mozilla_django_oidc.middleware.SessionRefresh middleware to refresh the +access token when it expires, based on the OIDC provided refresh token. + +This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377 +which is still not merged. +""" + +import json +import logging +import time +from urllib.parse import quote, urlencode + +from django.http import JsonResponse +from django.urls import reverse +from django.utils.crypto import get_random_string + +import requests +from mozilla_django_oidc.middleware import SessionRefresh + +try: + from mozilla_django_oidc.middleware import ( # pylint: disable=unused-import + RefreshOIDCAccessToken as MozillaRefreshOIDCAccessToken, + ) + + # If the import is successful, raise an error to notify the user that the + # version of mozilla_django_oidc added the expected middleware, and we don't need + # our implementation anymore. + # See https://github.com/mozilla/mozilla-django-oidc/pull/377 + raise RuntimeError("This version of mozilla_django_oidc has RefreshOIDCAccessToken") +except ImportError: + pass + +from mozilla_django_oidc.utils import ( + absolutify, + add_state_and_verifier_and_nonce_to_session, + import_from_settings, +) + +from core.authentication.backends import get_oidc_refresh_token, store_tokens + +logger = logging.getLogger(__name__) + + +class RefreshOIDCAccessToken(SessionRefresh): + """ + A middleware that will refresh the access token following proper OIDC protocol: + https://auth0.com/docs/tokens/refresh-token/current + + This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377 + but limited to our needs (YAGNI/KISS). + """ + + def _prepare_reauthorization(self, request): + """ + Constructs a new authorization grant request to refresh the session. + Besides constructing the request, the state and nonce included in the + request are registered in the current session in preparation for the + client following through with the authorization flow. + """ + auth_url = self.OIDC_OP_AUTHORIZATION_ENDPOINT + client_id = self.OIDC_RP_CLIENT_ID + state = get_random_string(self.OIDC_STATE_SIZE) + + # Build the parameters as if we were doing a real auth handoff, except + # we also include prompt=none. + auth_params = { + "response_type": "code", + "client_id": client_id, + "redirect_uri": absolutify( + request, reverse(self.OIDC_AUTHENTICATION_CALLBACK_URL) + ), + "state": state, + "scope": self.OIDC_RP_SCOPES, + "prompt": "none", + } + + if self.OIDC_USE_NONCE: + nonce = get_random_string(self.OIDC_NONCE_SIZE) + auth_params.update({"nonce": nonce}) + + # Register the one-time parameters in the session + add_state_and_verifier_and_nonce_to_session(request, state, auth_params) + request.session["oidc_login_next"] = request.get_full_path() + + query = urlencode(auth_params, quote_via=quote) + return f"{auth_url}?{query}" + + def is_expired(self, request): + """Check whether the access token is expired and needs to be refreshed.""" + if not self.is_refreshable_url(request): + logger.debug("request is not refreshable") + return False + + expiration = request.session.get("oidc_token_expiration", 0) + now = time.time() + if expiration > now: + # The id_token is still valid, so we don't have to do anything. + logger.debug("id token is still valid (%s > %s)", expiration, now) + return False + + return True + + def finish(self, request, prompt_reauth=True): + """Finish request handling and handle sending downstream responses for XHR. + This function should only be run if the session is determind to + be expired. + Almost all XHR request handling in client-side code struggles + with redirects since redirecting to a page where the user + is supposed to do something is extremely unlikely to work + in an XHR request. Make a special response for these kinds + of requests. + The use of 403 Forbidden is to match the fact that this + middleware doesn't really want the user in if they don't + refresh their session. + + WARNING: this varies from the original implementation: + - to return a 401 status code + - to consider all requests as XHR requests + """ + xhr_response_json = {"error": "the authentication session has expired"} + if prompt_reauth: + # The id_token has expired, so we have to re-authenticate silently. + refresh_url = self._prepare_reauthorization(request) + xhr_response_json["refresh_url"] = refresh_url + + xhr_response = JsonResponse(xhr_response_json, status=401) + if "refresh_url" in xhr_response_json: + xhr_response["refresh_url"] = xhr_response_json["refresh_url"] + return xhr_response + + def process_request(self, request): # noqa: PLR0911 # pylint: disable=too-many-return-statements + """Process the request and refresh the access token if necessary.""" + if not self.is_expired(request): + return None + + token_url = self.get_settings("OIDC_OP_TOKEN_ENDPOINT") + client_id = self.get_settings("OIDC_RP_CLIENT_ID") + client_secret = self.get_settings("OIDC_RP_CLIENT_SECRET") + refresh_token = get_oidc_refresh_token(request.session) + + if not refresh_token: + logger.debug("no refresh token stored") + return self.finish(request, prompt_reauth=True) + + token_payload = { + "grant_type": "refresh_token", + "client_id": client_id, + "client_secret": client_secret, + "refresh_token": refresh_token, + } + + req_auth = None + if self.get_settings("OIDC_TOKEN_USE_BASIC_AUTH", False): + # supported in https://github.com/mozilla/mozilla-django-oidc/pull/377 + # but we don't need it, so enforce error here. + raise RuntimeError("OIDC_TOKEN_USE_BASIC_AUTH is not supported") + + try: + response = requests.post( + token_url, + auth=req_auth, + data=token_payload, + verify=import_from_settings("OIDC_VERIFY_SSL", True), + timeout=import_from_settings("OIDC_TIMEOUT", 3), + ) + response.raise_for_status() + token_info = response.json() + except requests.exceptions.Timeout: + logger.debug("timed out refreshing access token") + # Don't prompt for reauth as this could be a temporary problem + return self.finish(request, prompt_reauth=False) + except requests.exceptions.HTTPError as exc: + status_code = exc.response.status_code + logger.debug("http error %s when refreshing access token", status_code) + # OAuth error response will be a 400 for various situations, including + # an expired token. https://datatracker.ietf.org/doc/html/rfc6749#section-5.2 + return self.finish(request, prompt_reauth=status_code == 400) + except json.JSONDecodeError: + logger.debug("malformed response when refreshing access token") + # Don't prompt for reauth as this could be a temporary problem + return self.finish(request, prompt_reauth=False) + except Exception as exc: # pylint: disable=broad-except + logger.exception( + "unknown error occurred when refreshing access token: %s", exc + ) + # Don't prompt for reauth as this could be a temporary problem + return self.finish(request, prompt_reauth=False) + + # Until we can properly validate an ID token on the refresh response + # per the spec[1], we intentionally drop the id_token. + # [1]: https://openid.net/specs/openid-connect-core-1_0.html#RefreshTokenResponse + id_token = None + access_token = token_info.get("access_token") + refresh_token = token_info.get("refresh_token") + store_tokens(request.session, access_token, id_token, refresh_token) + + return None diff --git a/src/backend/core/tests/authentication/test_backends.py b/src/backend/core/tests/authentication/test_backends.py index 8bd47caba..55b6e7a65 100644 --- a/src/backend/core/tests/authentication/test_backends.py +++ b/src/backend/core/tests/authentication/test_backends.py @@ -10,14 +10,37 @@ import pytest import responses +from cryptography.fernet import Fernet from core import models -from core.authentication.backends import OIDCAuthenticationBackend +from core.authentication.backends import ( + OIDCAuthenticationBackend, + get_oidc_refresh_token, + store_oidc_refresh_token, +) from core.factories import UserFactory pytestmark = pytest.mark.django_db +def test_oidc_refresh_token_session_store(settings): + """Test that the OIDC refresh token is stored and retrieved from the session.""" + session = {} + + with pytest.raises( + ValueError, match="OIDC_STORE_REFRESH_TOKEN_KEY setting is required." + ): + store_oidc_refresh_token(session, "test-refresh-token") + + settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key() + + store_oidc_refresh_token(session, "test-refresh-token") + assert session["oidc_refresh_token"] is not None + assert session["oidc_refresh_token"] != "test-refresh-token" + + assert get_oidc_refresh_token(session) == "test-refresh-token" + + def test_authentication_getter_existing_user_no_email( django_assert_num_queries, monkeypatch ): @@ -547,3 +570,56 @@ def get_userinfo_mocked(*args): assert user.full_name == "Doe" assert user.short_name is None assert user.email == "john.doe@example.com" + + +@responses.activate +def test_authentication_session_tokens( + django_assert_num_queries, monkeypatch, rf, settings +): + """ + Test that the session contains oidc_refresh_token and oidc_access_token after authentication. + """ + settings.OIDC_OP_TOKEN_ENDPOINT = "http://oidc.endpoint.test/token" + settings.OIDC_OP_USER_ENDPOINT = "http://oidc.endpoint.test/userinfo" + settings.OIDC_OP_JWKS_ENDPOINT = "http://oidc.endpoint.test/jwks" + settings.OIDC_STORE_ACCESS_TOKEN = True + settings.OIDC_STORE_REFRESH_TOKEN = True + settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key() + + klass = OIDCAuthenticationBackend() + request = rf.get("/some-url", {"state": "test-state", "code": "test-code"}) + request.session = {} + + def verify_token_mocked(*args, **kwargs): + return {"sub": "123", "email": "test@example.com"} + + monkeypatch.setattr(OIDCAuthenticationBackend, "verify_token", verify_token_mocked) + + responses.add( + responses.POST, + re.compile(settings.OIDC_OP_TOKEN_ENDPOINT), + json={ + "access_token": "test-access-token", + "refresh_token": "test-refresh-token", + }, + status=200, + ) + + responses.add( + responses.GET, + re.compile(settings.OIDC_OP_USER_ENDPOINT), + json={"sub": "123", "email": "test@example.com"}, + status=200, + ) + + with django_assert_num_queries(6): + user = klass.authenticate( + request, + code="test-code", + nonce="test-nonce", + code_verifier="test-code-verifier", + ) + + assert user is not None + assert request.session["oidc_access_token"] == "test-access-token" + assert get_oidc_refresh_token(request.session) == "test-refresh-token" diff --git a/src/backend/core/tests/authentication/test_decorators.py b/src/backend/core/tests/authentication/test_decorators.py new file mode 100644 index 000000000..fa3a1ce5e --- /dev/null +++ b/src/backend/core/tests/authentication/test_decorators.py @@ -0,0 +1,55 @@ +"""Tests for the refresh_oidc_access_token decorator in core app.""" + +from unittest.mock import patch + +from django.http import HttpResponse +from django.test import RequestFactory +from django.utils.decorators import method_decorator +from django.views import View + +from core.authentication.decorators import refresh_oidc_access_token + + +class RefreshOIDCAccessTokenView(View): + """ + A Django view that uses the refresh_oidc_access_token decorator to refresh + the OIDC access token before processing the request. + """ + + @method_decorator(refresh_oidc_access_token) + def dispatch(self, request, *args, **kwargs): + """ + Overrides the dispatch method to apply the refresh_oidc_access_token decorator. + """ + return super().dispatch(request, *args, **kwargs) + + def get(self, request, *args, **kwargs): + """ + Handles GET requests. + + Returns: + HttpResponse: A simple HTTP response with "OK" as the content. + """ + return HttpResponse("OK") + + +def test_refresh_oidc_access_token_decorator(): + """ + Tests the refresh_oidc_access_token decorator is called on RefreshOIDCAccessTokenView access. + + The test creates a mock request and patches the dispatch method to verify that it is called + with the correct request object. + """ + # Create a test request + factory = RequestFactory() + request = factory.get("/") + + # Mock the OIDC refresh functionality + with patch( + "core.authentication.middleware.RefreshOIDCAccessToken.process_request" + ) as mock_refresh: + # Call the decorated view + RefreshOIDCAccessTokenView.as_view()(request) + + # Assert that the refresh method was called + mock_refresh.assert_called_once_with(request) diff --git a/src/backend/core/tests/authentication/test_middleware.py b/src/backend/core/tests/authentication/test_middleware.py new file mode 100644 index 000000000..188bab96a --- /dev/null +++ b/src/backend/core/tests/authentication/test_middleware.py @@ -0,0 +1,327 @@ +"""Tests for the RefreshOIDCAccessToken middleware.""" + +import time +from unittest.mock import MagicMock + +from django.contrib.auth.models import AnonymousUser +from django.contrib.sessions.middleware import SessionMiddleware +from django.http import HttpResponse, JsonResponse +from django.test import RequestFactory + +import pytest +import requests.exceptions +import responses +from cryptography.fernet import Fernet + +from core import factories +from core.authentication.backends import ( + get_cipher_suite, + get_oidc_refresh_token, + store_oidc_refresh_token, +) +from core.authentication.middleware import RefreshOIDCAccessToken + +pytestmark = pytest.mark.django_db + + +@pytest.fixture(name="oidc_settings") +def fixture_oidc_settings(settings): + """Fixture to configure OIDC settings for the tests.""" + settings.OIDC_OP_TOKEN_ENDPOINT = "https://auth.example.com/token" + settings.OIDC_OP_AUTHORIZATION_ENDPOINT = "https://auth.example.com/authorize" + settings.OIDC_RP_CLIENT_ID = "client_id" + settings.OIDC_RP_CLIENT_SECRET = "client_secret" + settings.OIDC_AUTHENTICATION_CALLBACK_URL = "oidc_authentication_callback" + settings.OIDC_RP_SCOPES = "openid email" + settings.OIDC_USE_NONCE = True + settings.OIDC_STATE_SIZE = 32 + settings.OIDC_NONCE_SIZE = 32 + settings.OIDC_VERIFY_SSL = True + settings.OIDC_TOKEN_USE_BASIC_AUTH = False + settings.OIDC_STORE_ACCESS_TOKEN = True + settings.OIDC_STORE_REFRESH_TOKEN = True + settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key() + + get_cipher_suite.cache_clear() + + yield settings + + get_cipher_suite.cache_clear() + + +def test_anonymous_user(oidc_settings): # pylint: disable=unused-argument + """ + When the user is not authenticated, this + is not the purpose of the middleware to manage anything. + """ + request = RequestFactory().get("/test") + request.user = AnonymousUser() + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert response is None + + +def test_no_refresh_token(oidc_settings): # pylint: disable=unused-argument + """ + When the session does not contain a refresh token, + the middleware should return a 401 response containing + the URL to authenticate again. + """ + user = factories.UserFactory() + + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + + request.session["oidc_access_token"] = ("expired_token",) + request.session["oidc_token_expiration"] = time.time() - 100 + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, JsonResponse) + assert response.status_code == 401 + assert response.has_header("refresh_url") + assert response["refresh_url"].startswith("https://auth.example.com/authorize") + + +def test_basic_auth_disabled(oidc_settings): # pylint: disable=unused-argument + """We don't support OIDC_TOKEN_USE_BASIC_AUTH""" + oidc_settings.OIDC_TOKEN_USE_BASIC_AUTH = True + + user = factories.UserFactory() + + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + middleware = RefreshOIDCAccessToken(get_response) + with pytest.raises(RuntimeError) as excinfo: + middleware.process_request(request) + + assert str(excinfo.value) == "OIDC_TOKEN_USE_BASIC_AUTH is not supported" + + +@responses.activate +def test_successful_token_refresh(oidc_settings): # pylint: disable=unused-argument + """Test that the middleware successfully refreshes the token.""" + user = factories.UserFactory() + + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + json={"access_token": "new_token", "refresh_token": "new_refresh_token"}, + status=200, + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + request.session.save() + + assert response is None + assert request.session["oidc_access_token"] == "new_token" + assert get_oidc_refresh_token(request.session) == "new_refresh_token" + + +def test_non_expired_token(oidc_settings): # pylint: disable=unused-argument + """Test that the middleware does nothing when the token is not expired.""" + user = factories.UserFactory() + + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = ("valid_token",) + request.session["oidc_token_expiration"] = time.time() + 3600 + request.session.save() + + middleware = RefreshOIDCAccessToken(get_response) + + response = middleware.process_request(request) + assert response is None + + +@responses.activate +def test_refresh_token_request_timeout(oidc_settings): # pylint: disable=unused-argument + """Test that the middleware returns a 401 response when the token refresh request times out.""" + user = factories.UserFactory() + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + body=requests.exceptions.Timeout("timeout"), + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, HttpResponse) + assert response.status_code == 401 + assert not response.has_header("refresh_url") + + +@responses.activate +def test_refresh_token_request_error_400(oidc_settings): # pylint: disable=unused-argument + """ + Test that the middleware returns a 401 response when the token + refresh request returns a 400 error. + """ + user = factories.UserFactory() + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + json={"error": "invalid_grant"}, + status=400, + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, HttpResponse) + assert response.status_code == 401 + assert response.has_header("refresh_url") + assert response["refresh_url"].startswith("https://auth.example.com/authorize") + + +@responses.activate +def test_refresh_token_request_error(oidc_settings): # pylint: disable=unused-argument + """ + Test that the middleware returns a 401 response when + the token refresh request returns a 404 error. + """ + user = factories.UserFactory() + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + json={"error": "invalid_grant"}, + status=404, + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, HttpResponse) + assert response.status_code == 401 + assert not response.has_header("refresh_url") + + +@responses.activate +def test_refresh_token_request_malformed_json_error(oidc_settings): # pylint: disable=unused-argument + """ + Test that the middleware returns a 401 response + when the token refresh request returns malformed JSON. + """ + user = factories.UserFactory() + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + body="malformed json", + status=200, + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, HttpResponse) + assert response.status_code == 401 + assert not response.has_header("refresh_url") + + +@responses.activate +def test_refresh_token_request_exception(oidc_settings): # pylint: disable=unused-argument + """ + Test that the middleware returns a 401 response + when the token refresh request raises an exception. + """ + user = factories.UserFactory() + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + body={"error": "invalid_grant"}, # invalid format dict + status=200, + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, HttpResponse) + assert response.status_code == 401 + assert not response.has_header("refresh_url") diff --git a/src/backend/impress/settings.py b/src/backend/impress/settings.py index 4a741f7c2..f4f64d168 100755 --- a/src/backend/impress/settings.py +++ b/src/backend/impress/settings.py @@ -487,6 +487,17 @@ class Base(Configuration): environ_name="OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION", environ_prefix=None, ) + OIDC_STORE_ACCESS_TOKEN = values.BooleanValue( + default=True, environ_name="OIDC_STORE_ACCESS_TOKEN", environ_prefix=None + ) + OIDC_STORE_REFRESH_TOKEN = values.BooleanValue( + default=True, environ_name="OIDC_STORE_REFRESH_TOKEN", environ_prefix=None + ) + OIDC_STORE_REFRESH_TOKEN_KEY = values.Value( + default=None, + environ_name="OIDC_STORE_REFRESH_TOKEN_KEY", + environ_prefix=None, + ) # WARNING: Enabling this setting allows multiple user accounts to share the same email # address. This may cause security issues and is not recommended for production use when