Skip to content

✨(oidc) add refresh token tools #584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ and this project adheres to

## [Unreleased]

## Added

- ✨(oidc) add refresh token tools #584

## [2.5.0] - 2025-03-18

## Added
Expand Down
3 changes: 3 additions & 0 deletions env.d/development/common.e2e.dist
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ BURST_THROTTLE_RATES="200/minute"
DJANGO_SERVER_TO_SERVER_API_TOKENS=test-e2e
Y_PROVIDER_API_KEY=yprovider-api-key
Y_PROVIDER_API_BASE_URL=http://y-provider:4444/api/

# - add a key to store the refresh token in tests
OIDC_STORE_REFRESH_TOKEN_KEY=qnw7gZrOFLkLuZIixzuxksNORFJyjWyi5ACugNchKJY=
72 changes: 72 additions & 0 deletions src/backend/core/authentication/backends.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,100 @@
"""Authentication Backends for the Impress core app."""

import logging
from functools import lru_cache

from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.utils.translation import gettext_lazy as _

import requests
from cryptography.fernet import Fernet
from mozilla_django_oidc.auth import (
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
)
from mozilla_django_oidc.utils import import_from_settings

from core.models import DuplicateEmailError, User

logger = logging.getLogger(__name__)


@lru_cache(maxsize=0)
def get_cipher_suite():
"""Return a Fernet cipher suite."""
key = import_from_settings("OIDC_STORE_REFRESH_TOKEN_KEY", None)
if not key:
raise ValueError("OIDC_STORE_REFRESH_TOKEN_KEY setting is required.")
return Fernet(key)


def store_oidc_refresh_token(session, refresh_token):
"""Store the encrypted OIDC refresh token in the session if enabled in settings."""
if import_from_settings("OIDC_STORE_REFRESH_TOKEN", False):
encrypted_token = get_cipher_suite().encrypt(refresh_token.encode())
session["oidc_refresh_token"] = encrypted_token.decode()


def get_oidc_refresh_token(session):
"""Retrieve and decrypt the OIDC refresh token from the session."""
encrypted_token = session.get("oidc_refresh_token")
if encrypted_token:
return get_cipher_suite().decrypt(encrypted_token.encode()).decode()
return None


def store_tokens(session, access_token, id_token, refresh_token):
"""Store tokens in the session if enabled in settings."""
if import_from_settings("OIDC_STORE_ACCESS_TOKEN", False):
session["oidc_access_token"] = access_token

if import_from_settings("OIDC_STORE_ID_TOKEN", False):
session["oidc_id_token"] = id_token

store_oidc_refresh_token(session, refresh_token)


class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
"""Custom OpenID Connect (OIDC) Authentication Backend.

This class overrides the default OIDC Authentication Backend to accommodate differences
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
"""

def __init__(self, *args, **kwargs):
"""
Initialize the OIDC Authentication Backend.

Adds an internal attribute to store the token_info dictionary.
The purpose of `self._token_info` is to not duplicate code from
the original `authenticate` method.
This won't be needed after https://github.com/mozilla/mozilla-django-oidc/pull/377
is merged.
"""
super().__init__(*args, **kwargs)
self._token_info = None

def get_token(self, payload):
"""
Return token object as a dictionary.

Store the value to extract the refresh token in the `authenticate` method.
"""
self._token_info = super().get_token(payload)
return self._token_info

def authenticate(self, request, **kwargs):
"""Authenticates a user based on the OIDC code flow."""
user = super().authenticate(request, **kwargs)

if user is not None:
# Then the user successfully authenticated
store_oidc_refresh_token(
request.session, self._token_info.get("refresh_token")
)

return user

def get_userinfo(self, access_token, id_token, payload):
"""Return user details dictionary.

Expand Down
12 changes: 12 additions & 0 deletions src/backend/core/authentication/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""
Decorators for the authentication app.

We don't want (yet) to enforce the OIDC access token to be "fresh" for all
views, so we provide a decorator to refresh the access token only when needed.
"""

from django.utils.decorators import decorator_from_middleware

from .middleware import RefreshOIDCAccessToken

refresh_oidc_access_token = decorator_from_middleware(RefreshOIDCAccessToken)
199 changes: 199 additions & 0 deletions src/backend/core/authentication/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""
Module to declare a RefreshOIDCAccessToken middleware that extends the
mozilla_django_oidc.middleware.SessionRefresh middleware to refresh the
access token when it expires, based on the OIDC provided refresh token.

This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377
which is still not merged.
"""

import json
import logging
import time
from urllib.parse import quote, urlencode

from django.http import JsonResponse
from django.urls import reverse
from django.utils.crypto import get_random_string

import requests
from mozilla_django_oidc.middleware import SessionRefresh

try:
from mozilla_django_oidc.middleware import ( # pylint: disable=unused-import
RefreshOIDCAccessToken as MozillaRefreshOIDCAccessToken,
)

# If the import is successful, raise an error to notify the user that the
# version of mozilla_django_oidc added the expected middleware, and we don't need
# our implementation anymore.
# See https://github.com/mozilla/mozilla-django-oidc/pull/377
raise RuntimeError("This version of mozilla_django_oidc has RefreshOIDCAccessToken")
except ImportError:
pass

from mozilla_django_oidc.utils import (
absolutify,
add_state_and_verifier_and_nonce_to_session,
import_from_settings,
)

from core.authentication.backends import get_oidc_refresh_token, store_tokens

logger = logging.getLogger(__name__)


class RefreshOIDCAccessToken(SessionRefresh):
"""
A middleware that will refresh the access token following proper OIDC protocol:
https://auth0.com/docs/tokens/refresh-token/current

This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377
but limited to our needs (YAGNI/KISS).
"""

def _prepare_reauthorization(self, request):
"""
Constructs a new authorization grant request to refresh the session.
Besides constructing the request, the state and nonce included in the
request are registered in the current session in preparation for the
client following through with the authorization flow.
"""
auth_url = self.OIDC_OP_AUTHORIZATION_ENDPOINT
client_id = self.OIDC_RP_CLIENT_ID
state = get_random_string(self.OIDC_STATE_SIZE)

# Build the parameters as if we were doing a real auth handoff, except
# we also include prompt=none.
auth_params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": absolutify(
request, reverse(self.OIDC_AUTHENTICATION_CALLBACK_URL)
),
"state": state,
"scope": self.OIDC_RP_SCOPES,
"prompt": "none",
}

if self.OIDC_USE_NONCE:
nonce = get_random_string(self.OIDC_NONCE_SIZE)
auth_params.update({"nonce": nonce})

# Register the one-time parameters in the session
add_state_and_verifier_and_nonce_to_session(request, state, auth_params)
request.session["oidc_login_next"] = request.get_full_path()

query = urlencode(auth_params, quote_via=quote)
return f"{auth_url}?{query}"

def is_expired(self, request):
"""Check whether the access token is expired and needs to be refreshed."""
if not self.is_refreshable_url(request):
logger.debug("request is not refreshable")
return False

expiration = request.session.get("oidc_token_expiration", 0)
now = time.time()
if expiration > now:
# The id_token is still valid, so we don't have to do anything.
logger.debug("id token is still valid (%s > %s)", expiration, now)
return False

return True

def finish(self, request, prompt_reauth=True):
"""Finish request handling and handle sending downstream responses for XHR.
This function should only be run if the session is determind to
be expired.
Almost all XHR request handling in client-side code struggles
with redirects since redirecting to a page where the user
is supposed to do something is extremely unlikely to work
in an XHR request. Make a special response for these kinds
of requests.
The use of 403 Forbidden is to match the fact that this
middleware doesn't really want the user in if they don't
refresh their session.

WARNING: this varies from the original implementation:
- to return a 401 status code
- to consider all requests as XHR requests
"""
xhr_response_json = {"error": "the authentication session has expired"}
if prompt_reauth:
# The id_token has expired, so we have to re-authenticate silently.
refresh_url = self._prepare_reauthorization(request)
xhr_response_json["refresh_url"] = refresh_url

xhr_response = JsonResponse(xhr_response_json, status=401)
if "refresh_url" in xhr_response_json:
xhr_response["refresh_url"] = xhr_response_json["refresh_url"]
return xhr_response

def process_request(self, request): # noqa: PLR0911 # pylint: disable=too-many-return-statements
"""Process the request and refresh the access token if necessary."""
if not self.is_expired(request):
return None

token_url = self.get_settings("OIDC_OP_TOKEN_ENDPOINT")
client_id = self.get_settings("OIDC_RP_CLIENT_ID")
client_secret = self.get_settings("OIDC_RP_CLIENT_SECRET")
refresh_token = get_oidc_refresh_token(request.session)

if not refresh_token:
logger.debug("no refresh token stored")
return self.finish(request, prompt_reauth=True)

token_payload = {
"grant_type": "refresh_token",
"client_id": client_id,
"client_secret": client_secret,
"refresh_token": refresh_token,
}

req_auth = None
if self.get_settings("OIDC_TOKEN_USE_BASIC_AUTH", False):
# supported in https://github.com/mozilla/mozilla-django-oidc/pull/377
# but we don't need it, so enforce error here.
raise RuntimeError("OIDC_TOKEN_USE_BASIC_AUTH is not supported")

try:
response = requests.post(
token_url,
auth=req_auth,
data=token_payload,
verify=import_from_settings("OIDC_VERIFY_SSL", True),
timeout=import_from_settings("OIDC_TIMEOUT", 3),
)
response.raise_for_status()
token_info = response.json()
except requests.exceptions.Timeout:
logger.debug("timed out refreshing access token")
# Don't prompt for reauth as this could be a temporary problem
return self.finish(request, prompt_reauth=False)
except requests.exceptions.HTTPError as exc:
status_code = exc.response.status_code
logger.debug("http error %s when refreshing access token", status_code)
# OAuth error response will be a 400 for various situations, including
# an expired token. https://datatracker.ietf.org/doc/html/rfc6749#section-5.2
return self.finish(request, prompt_reauth=status_code == 400)
except json.JSONDecodeError:
logger.debug("malformed response when refreshing access token")
# Don't prompt for reauth as this could be a temporary problem
return self.finish(request, prompt_reauth=False)
except Exception as exc: # pylint: disable=broad-except
logger.exception(
"unknown error occurred when refreshing access token: %s", exc
)
# Don't prompt for reauth as this could be a temporary problem
return self.finish(request, prompt_reauth=False)

# Until we can properly validate an ID token on the refresh response
# per the spec[1], we intentionally drop the id_token.
# [1]: https://openid.net/specs/openid-connect-core-1_0.html#RefreshTokenResponse
id_token = None
access_token = token_info.get("access_token")
refresh_token = token_info.get("refresh_token")
store_tokens(request.session, access_token, id_token, refresh_token)

return None
Loading
Loading