diff --git a/dependencies/pip/dev_requirements.txt b/dependencies/pip/dev_requirements.txt index 25baa77b6a..998e39923b 100644 --- a/dependencies/pip/dev_requirements.txt +++ b/dependencies/pip/dev_requirements.txt @@ -108,6 +108,7 @@ cron-descriptor==1.4.3 cryptography==42.0.5 # via # azure-storage-blob + # fido2 # jwcrypto # paramiko # pyopenssl @@ -172,7 +173,7 @@ django==4.2.24 # drf-spectacular # jsonfield # model-bakery -django-allauth==65.11.2 +django-allauth[mfa]==65.11.2 # via -r dependencies/pip/requirements.in django-amazon-ses==4.0.1 # via -r dependencies/pip/requirements.in @@ -276,6 +277,8 @@ fabric==3.2.2 # via -r dependencies/pip/dev_requirements.in fakeredis==2.30.1 # via -r dependencies/pip/dev_requirements.in +fido2==2.0.0 + # via django-allauth flake8==7.1.1 # via # -r dependencies/pip/dev_requirements.in @@ -557,6 +560,8 @@ pyyaml==6.0.1 # via # drf-spectacular # responses +qrcode==8.2 + # via django-allauth redis==5.0.3 # via # celery diff --git a/dependencies/pip/requirements.in b/dependencies/pip/requirements.in index cc94e908e5..3f3dbbdfc8 100644 --- a/dependencies/pip/requirements.in +++ b/dependencies/pip/requirements.in @@ -28,7 +28,8 @@ dict2xml defusedxml dj-static dj-stripe -django-allauth +django-allauth>=65.11.2 +django-allauth[mfa] django-braces django-celery-beat django-constance @@ -116,4 +117,4 @@ djangorestframework-jsonp pandas # Api Documentation -drf-spectacular \ No newline at end of file +drf-spectacular diff --git a/dependencies/pip/requirements.txt b/dependencies/pip/requirements.txt index 5d376492ab..1aeff92c95 100644 --- a/dependencies/pip/requirements.txt +++ b/dependencies/pip/requirements.txt @@ -92,6 +92,7 @@ cron-descriptor==1.4.3 cryptography==42.0.5 # via # azure-storage-blob + # fido2 # jwcrypto # pyopenssl cssselect==1.2.0 @@ -140,7 +141,7 @@ django==4.2.24 # djangorestframework # drf-spectacular # jsonfield -django-allauth==65.11.2 +django-allauth[mfa]==65.11.2 # via -r dependencies/pip/requirements.in django-amazon-ses==4.0.1 # via -r dependencies/pip/requirements.in @@ -230,6 +231,8 @@ drf-spectacular==0.28.0 # via -r dependencies/pip/requirements.in et-xmlfile==1.1.0 # via openpyxl +fido2==2.0.0 + # via django-allauth flower==2.0.1 # via -r dependencies/pip/requirements.in frozenlist==1.4.1 @@ -430,6 +433,8 @@ pyyaml==6.0.1 # via # drf-spectacular # responses +qrcode==8.2 + # via django-allauth redis==5.0.3 # via # celery diff --git a/hub/admin/extend_user.py b/hub/admin/extend_user.py index 711ac48817..12d1812bc7 100644 --- a/hub/admin/extend_user.py +++ b/hub/admin/extend_user.py @@ -14,7 +14,7 @@ from django.utils import timezone from django.utils.safestring import mark_safe -from kobo.apps.accounts.mfa.models import MfaMethod +from kobo.apps.accounts.mfa.models import MfaMethodsWrapper from kobo.apps.accounts.validators import ( USERNAME_INVALID_MESSAGE, USERNAME_MAX_LENGTH, @@ -27,7 +27,6 @@ from kobo.apps.trash_bin.models.account import AccountTrash from kobo.apps.trash_bin.utils import move_to_trash from kpi.models.asset import AssetDeploymentStatus - from .filters import UserAdvancedSearchFilter from .mixins import AdvancedSearchMixin @@ -37,7 +36,7 @@ def validate_superuser_auth(obj) -> bool: obj.is_superuser and config.SUPERUSER_AUTH_ENFORCEMENT and obj.has_usable_password() - and not MfaMethod.objects.filter(user=obj, is_active=True).exists() + and not MfaMethodsWrapper.objects.filter(user=obj, is_active=True).exists() ): return False return True diff --git a/hub/tests/test_admin_validators.py b/hub/tests/test_admin_validators.py index 0b185a8ab0..a55edfb644 100644 --- a/hub/tests/test_admin_validators.py +++ b/hub/tests/test_admin_validators.py @@ -2,7 +2,7 @@ from django.test import TestCase from hub.admin.extend_user import validate_superuser_auth -from kobo.apps.accounts.mfa.models import MfaMethod +from kobo.apps.accounts.mfa.models import MfaMethodsWrapper from kobo.apps.kobo_auth.shortcuts import User @@ -22,5 +22,5 @@ def test_superuser_with_unusable_password(self): self.assertTrue(validate_superuser_auth(self.superuser)) def test_superuser_with_mfa_enabled(self): - MfaMethod.objects.create(user=self.superuser, is_active=True) + MfaMethodsWrapper.objects.create(user=self.superuser, is_active=True) self.assertTrue(validate_superuser_auth(self.superuser)) diff --git a/kobo/apps/__init__.py b/kobo/apps/__init__.py index d335b3d442..db660e4e78 100644 --- a/kobo/apps/__init__.py +++ b/kobo/apps/__init__.py @@ -1,8 +1,6 @@ -import trench from django.apps import AppConfig from django.core.checks import Tags, register -import kpi.utils.monkey_patching # noqa from kpi.utils.two_database_configuration_checker import TwoDatabaseConfigurationChecker @@ -10,24 +8,6 @@ class KpiConfig(AppConfig): name = 'kpi' def ready(self, *args, **kwargs): - # These imports cannot be at the top until the app is loaded. - from kobo.apps.accounts.mfa.command import ( - create_mfa_method_command, - deactivate_mfa_method_command, - ) - - # Monkey-patch `django-trench` to avoid duplicating lots of code in views, - # and serializers just for few line changes. - # Changed behaviours: - # 1. Stop blocking deactivation of primary method - trench.command.deactivate_mfa_method.deactivate_mfa_method_command = ( - deactivate_mfa_method_command - ) - # 2. Resetting secret on reactivation - trench.command.create_mfa_method.create_mfa_method_command = ( - create_mfa_method_command - ) - # Load all schema extension modules to register them import kpi.schema_extensions.imports # noqa F401 diff --git a/kobo/apps/accounts/adapter.py b/kobo/apps/accounts/adapter.py index 20de6cfb0d..b226069bda 100644 --- a/kobo/apps/accounts/adapter.py +++ b/kobo/apps/accounts/adapter.py @@ -2,22 +2,11 @@ from allauth.account.forms import SignupForm from constance import config from django.conf import settings -from django.contrib.auth import REDIRECT_FIELD_NAME, login from django.db import transaction -from django.shortcuts import resolve_url -from django.template.response import TemplateResponse from django.utils import timezone -from trench.utils import get_mfa_model, user_token_generator - -from .mfa.forms import MfaTokenForm -from .mfa.models import MfaAvailableToUser -from .mfa.permissions import mfa_allowed_for_user -from .mfa.views import MfaTokenView -from .utils import user_has_inactive_paid_subscription class AccountAdapter(DefaultAccountAdapter): - def is_open_for_signup(self, request): return config.REGISTRATION_OPEN @@ -26,44 +15,6 @@ def login(self, request, user): user.backend = settings.AUTHENTICATION_BACKENDS[0] super().login(request, user) - def pre_login(self, request, user, **kwargs): - - if parent_response := super().pre_login(request, user, **kwargs): - # A response from the parent means the login process must be - # interrupted, e.g. due to the user being inactive or not having - # validated their email address - return parent_response - - # If MFA is activated and allowed for the user, display the token form before letting them in - mfa_active = ( - get_mfa_model().objects.filter(is_active=True, user=user).exists() - ) - mfa_allowed = mfa_allowed_for_user(user) - inactive_subscription = user_has_inactive_paid_subscription( - user.username - ) - if mfa_active and (mfa_allowed or inactive_subscription): - ephemeral_token_cache = user_token_generator.make_token(user) - mfa_token_form = MfaTokenForm( - initial={'ephemeral_token': ephemeral_token_cache} - ) - - next_url = kwargs.get('redirect_url') or resolve_url( - settings.LOGIN_REDIRECT_URL - ) - - context = { - REDIRECT_FIELD_NAME: next_url, - 'view': MfaTokenView, - 'form': mfa_token_form, - } - - return TemplateResponse( - request=request, - template='mfa_token.html', - context=context, - ) - def save_user(self, request, user, form, commit=True): # Compare allauth SignupForm with our custom field standard_fields = set(SignupForm().fields.keys()) diff --git a/kobo/apps/accounts/mfa/adapter.py b/kobo/apps/accounts/mfa/adapter.py new file mode 100644 index 0000000000..ff4415996f --- /dev/null +++ b/kobo/apps/accounts/mfa/adapter.py @@ -0,0 +1,86 @@ +from typing import Optional + +from allauth.mfa.adapter import DefaultMFAAdapter +from allauth.mfa.models import Authenticator +from constance import config +from django.conf import settings + +from ..utils import user_has_inactive_paid_subscription +from .models import MfaMethod, MfaMethodsWrapper +from .permissions import mfa_allowed_for_user + + +class MfaAdapter(DefaultMFAAdapter): + + def is_mfa_enabled(self, user, types=None) -> bool: + # NOTE: This is a temporary thing. We are migrating users to the allauth tables + # When the migration is done it won't be necessary. + self.migrate_user(user) + mfa_active_super = super().is_mfa_enabled(user, types) + mfa_active = ( + mfa_active_super + and MfaMethodsWrapper.objects.filter(user=user, is_active=True).first() + is not None + ) + mfa_allowed = mfa_allowed_for_user(user) + inactive_subscription = user_has_inactive_paid_subscription(user.username) + return mfa_active and (mfa_allowed or inactive_subscription) + + def get_totp_label(self, user) -> str: + """Returns the label used for representing the given user in a TOTP QR + code. + """ + return f'{config.MFA_ISSUER_NAME}-{user.username}' + + def get_totp_issuer(self) -> str: + """Returns the TOTP issuer name that will be contained in the TOTP QR + code. + """ + return config.MFA_ISSUER_NAME + + def migrate_user( + self, user: settings.AUTH_USER_MODEL, mfa_method: MfaMethod = None + ) -> Optional[MfaMethodsWrapper]: + """Migrate user MFA data from trench tables to allauth tables""" + if not mfa_method: + mfa_method = ( + MfaMethod.objects.filter(name='app', user=user, is_active=True) + .order_by('is_primary') + .first() + ) + if not mfa_method: + return + authenticators = Authenticator.objects.filter(user_id=user.id) + types_ok = {a.type for a in authenticators} == { + Authenticator.Type.TOTP, + Authenticator.Type.RECOVERY_CODES, + } + # If allauth MFA Authenticators already exist, exit + if types_ok: + return + for authenticator in authenticators: + authenticator.delete() + + encrypted_secret = self.encrypt(mfa_method.secret) + totp_authenticator = Authenticator.objects.create( + user_id=mfa_method.user_id, + type=Authenticator.Type.TOTP, + data={'secret': encrypted_secret}, + ) + recovery_codes = Authenticator.objects.create( + user_id=mfa_method.user_id, + type=Authenticator.Type.RECOVERY_CODES, + data={ + 'migrated_codes': [self.encrypt(c) for c in mfa_method.backup_codes], + 'used_mask': 0, + }, + ) + mfa_method_wrapper = MfaMethodsWrapper.objects.create( + name='app', + user=user, + is_active=True, + totp=totp_authenticator, + recovery_codes=recovery_codes, + secret=encrypted_secret, + ) + return mfa_method_wrapper diff --git a/kobo/apps/accounts/mfa/admin.py b/kobo/apps/accounts/mfa/admin.py index ba13eee5c7..216e8addf1 100644 --- a/kobo/apps/accounts/mfa/admin.py +++ b/kobo/apps/accounts/mfa/admin.py @@ -2,13 +2,13 @@ from django.contrib import admin from .models import ( - TrenchMFAMethod, + ExtendedTrenchMfaMethodAdmin, MfaAvailableToUser, MfaAvailableToUserAdmin, MfaMethod, - MfaMethodAdmin, + TrenchMFAMethod, ) admin.site.unregister(TrenchMFAMethod) -admin.site.register(MfaMethod, MfaMethodAdmin) +admin.site.register(MfaMethod, ExtendedTrenchMfaMethodAdmin) admin.site.register(MfaAvailableToUser, MfaAvailableToUserAdmin) diff --git a/kobo/apps/accounts/mfa/apps.py b/kobo/apps/accounts/mfa/apps.py index e36682ba51..9148b5e56a 100644 --- a/kobo/apps/accounts/mfa/apps.py +++ b/kobo/apps/accounts/mfa/apps.py @@ -4,6 +4,7 @@ class MfaAppConfig(AppConfig): name = 'kobo.apps.accounts.mfa' verbose_name = 'Multi-factor authentication' + label = 'accounts_mfa' def ready(self): from . import signals # noqa F401 diff --git a/kobo/apps/accounts/mfa/backends/__init__.py b/kobo/apps/accounts/mfa/backends/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/kobo/apps/accounts/mfa/backends/application.py b/kobo/apps/accounts/mfa/backends/application.py deleted file mode 100644 index 3c33924c57..0000000000 --- a/kobo/apps/accounts/mfa/backends/application.py +++ /dev/null @@ -1,30 +0,0 @@ -# coding: utf-8 -import constance -import pyotp -from django.conf import settings -from trench.backends.application import ApplicationMessageDispatcher - -from kobo.apps.kobo_auth.shortcuts import User - - -class ApplicationBackend(ApplicationMessageDispatcher): - """ - Custom class based on `django-trench` AuthenticationAppBackend class. - It provides OTP based QR link to be scanned by like apps - Google Authenticator and Authy. - Unlike `django-trench`, it is also customizable number of digits, - validity period and issuer name. - """ - - def _create_qr_link(self, user: User) -> str: - return self._get_otp().provisioning_uri( - getattr(user, User.USERNAME_FIELD), - issuer_name=constance.config.MFA_ISSUER_NAME, - ) - - def _get_otp(self) -> pyotp.TOTP: - return pyotp.TOTP( - self._mfa_method.secret, - digits=settings.TRENCH_AUTH['CODE_LENGTH'], - interval=self._get_valid_window(), - ) diff --git a/kobo/apps/accounts/mfa/command/__init__.py b/kobo/apps/accounts/mfa/command/__init__.py deleted file mode 100644 index cdf967a491..0000000000 --- a/kobo/apps/accounts/mfa/command/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .create_mfa_method import create_mfa_method_command -from .deactivate_mfa_method import deactivate_mfa_method_command diff --git a/kobo/apps/accounts/mfa/command/create_mfa_method.py b/kobo/apps/accounts/mfa/command/create_mfa_method.py deleted file mode 100644 index d063069672..0000000000 --- a/kobo/apps/accounts/mfa/command/create_mfa_method.py +++ /dev/null @@ -1,46 +0,0 @@ -# coding: utf-8 -from trench.command.create_secret import create_secret_command -from trench.command.create_mfa_method import ( - CreateMFAMethodCommand as TrenchCreateMfAMethodCommand, -) -from trench.exceptions import MFAMethodAlreadyActiveError -from trench.models import MFAMethod -from trench.utils import get_mfa_model - - -class CreateMfaMethodCommand(TrenchCreateMfAMethodCommand): - """ - Overload `django-trench` behaviour which keeps the same secret forever even - if MFA method is disabled/enabled. - See https://github.com/merixstudio/django-trench/blob/6a9f3f049b6c1d97fbfd05e0527b693e04c33891/trench/command/create_mfa_method.py#L14-L25 - - When the client requests creation of a new MFA method, *always* create - a new secret. The default django-trench behavior reuses old secrets, - meaning that without this override, a given user would always see the - same QR code no matter how many times they enabled and disabled MFA. - That default behavior would create a security weakness if a user lost - their original device, because deactivating MFA and reconfiguring it on - a new device would not prevent the lost device from continuing to - generate valid TOTPs. - """ - def execute(self, user_id: int, name: str) -> MFAMethod: - mfa, created = self._mfa_model.objects.get_or_create( - user_id=user_id, - name=name, - defaults={ - 'secret': self._create_secret, - 'is_active': False, - }, - ) - if not created: - if mfa.is_active: - raise MFAMethodAlreadyActiveError() - else: - mfa.secret = self._create_secret() - mfa.save() - return mfa - - -create_mfa_method_command = CreateMfaMethodCommand( - secret_generator=create_secret_command, mfa_model=get_mfa_model() -).execute diff --git a/kobo/apps/accounts/mfa/command/deactivate_mfa_method.py b/kobo/apps/accounts/mfa/command/deactivate_mfa_method.py deleted file mode 100644 index 4495270acf..0000000000 --- a/kobo/apps/accounts/mfa/command/deactivate_mfa_method.py +++ /dev/null @@ -1,27 +0,0 @@ -# coding: utf-8 -from django.db.transaction import atomic -from trench.command.deactivate_mfa_method import DeactivateMFAMethodCommand -from trench.exceptions import MFANotEnabledError -from trench.utils import get_mfa_model - - -class DeactivateAllMfaMethodCommand(DeactivateMFAMethodCommand): - """ - Overload `django-trench` behaviour introduced with v0.3.0 which blocks the - deactivation of primary MFA method. - See https://github.com/merixstudio/django-trench/blob/990c0e9687eb8ed219d0d04a5cd69f71b27ec43b/trench/command/deactivate_mfa_method.py#L17-L18 - """ - @atomic - def execute(self, mfa_method_name: str, user_id: int) -> None: - mfa = self._mfa_model.objects.get_by_name(user_id=user_id, name=mfa_method_name) - if not mfa.is_active: - raise MFANotEnabledError() - - self._mfa_model.objects.filter(user_id=user_id, name=mfa_method_name).update( - is_active=False - ) - - -deactivate_mfa_method_command = DeactivateAllMfaMethodCommand( - mfa_model=get_mfa_model() -).execute diff --git a/kobo/apps/accounts/mfa/flows.py b/kobo/apps/accounts/mfa/flows.py new file mode 100644 index 0000000000..070f42578c --- /dev/null +++ b/kobo/apps/accounts/mfa/flows.py @@ -0,0 +1,70 @@ +from allauth.headless.mfa.inputs import ActivateTOTPInput +from allauth.mfa import signals +from allauth.mfa.base.internal.flows import delete_and_cleanup +from allauth.mfa.models import Authenticator +from allauth.mfa.recovery_codes.internal.auth import RecoveryCodes +from allauth.mfa.recovery_codes.internal.flows import auto_generate_recovery_codes +from allauth.mfa.totp.internal.auth import TOTP +from rest_framework.exceptions import NotFound, ValidationError + +from .models import MfaMethodsWrapper + + +def activate_totp(request, name): + try: + mfa = MfaMethodsWrapper.objects.get( + user=request.user, + name=name, + is_active=False, + ) + except MfaMethodsWrapper.DoesNotExist: + raise NotFound + + form = ActivateTOTPInput(request.data, user=request.user) + if not form.is_valid(): + raise ValidationError(detail=form.errors) + + totp = TOTP.activate(request.user, form.secret).instance + signals.authenticator_added.send( + sender=Authenticator, + request=request, + user=request.user, + authenticator=totp, + ) + recovery_codes = auto_generate_recovery_codes(request) + mfa.totp = totp + mfa.recovery_codes = recovery_codes + mfa.is_active = True + mfa.save() + + return totp.wrap(), recovery_codes.wrap() + + +def regenerate_codes(request, name): + try: + mfa = MfaMethodsWrapper.objects.get( + user=request.user, + name=name, + is_active=True, + ) + except MfaMethodsWrapper.DoesNotExist: + raise NotFound + mfa.recovery_codes.delete() + mfa.recovery_codes = RecoveryCodes.activate(request.user).instance + mfa.save() + + return mfa.recovery_codes.wrap() + + +def deactivate_totp(request, name): + try: + mfa = MfaMethodsWrapper.objects.get( + user=request.user, + name=name, + is_active=True, + ) + except MfaMethodsWrapper.DoesNotExist: + raise NotFound + mfa.is_active = False + mfa.save() + delete_and_cleanup(request, mfa.totp) diff --git a/kobo/apps/accounts/mfa/forms.py b/kobo/apps/accounts/mfa/forms.py index 0d0956d816..7491a17318 100644 --- a/kobo/apps/accounts/mfa/forms.py +++ b/kobo/apps/accounts/mfa/forms.py @@ -1,106 +1,48 @@ -# coding: utf-8 -from django import forms -from django.conf import settings -from django.utils.translation import gettext_lazy as t -from trench.command.authenticate_second_factor import ( - authenticate_second_step_command, -) -from trench.exceptions import MFAValidationError -from trench.serializers import CodeLoginSerializer -from trench.utils import get_mfa_model, user_token_generator - -from kobo.apps.accounts.forms import LoginForm - - -class MfaLoginForm(LoginForm): - """ - Authenticating users. - If 2FA is activated, first step (of two) of the login process. - """ - - def __init__(self, *args, **kwargs): - self.ephemeral_token_cache = None - super().__init__(*args, **kwargs) - - def clean(self, *args, **kwargs): - cleaned_data = super().clean(*args, **kwargs) - # `super().clean()` initialize the object `self.user` with - # the user object retrieved from authentication (if any) - # Because we only support one 2FA method, we do not filter on - # `is_primary` too (as django_trench does). - # ToDo Figure out why `is_primary` is False sometimes after reactivating - # 2FA - if get_mfa_model().objects.filter(is_active=True, user=self.user).exists(): - self.ephemeral_token_cache = user_token_generator.make_token( - self.user - ) - - return cleaned_data - - def get_ephemeral_token(self): - return self.ephemeral_token_cache - - -class MfaTokenForm(forms.Form): - """ - Validate 2FA token. - Second (and last) step of login process when MFA is activated. - """ - - code = forms.CharField( - label='', - strip=True, - required=True, - widget=forms.TextInput( - attrs={ - 'placeholder': t( - 'Enter the ##token length##-character token' - ).replace( - '##token length##', str(settings.TRENCH_AUTH['CODE_LENGTH']) - ) - } - ), - ) - ephemeral_token = forms.CharField( - required=True, - widget=forms.HiddenInput(), - ) - - error_messages = {'invalid_code': t('Your token is invalid')} - - def __init__(self, request=None, *args, **kwargs): - self.user_cache = None - super().__init__(*args, **kwargs) - - def clean(self): - code_login_serializer = CodeLoginSerializer(data=self.cleaned_data) - if not code_login_serializer.is_valid(): - raise self.get_invalid_mfa_error() - - try: - self.user_cache = authenticate_second_step_command( - code=code_login_serializer.validated_data['code'], - ephemeral_token=code_login_serializer.validated_data[ - 'ephemeral_token' - ], - ) - except MFAValidationError: - raise self.get_invalid_mfa_error() - - # When login is successful, `django.contrib.auth.login()` expects the - # authentication backend class to be attached to user object. - # See https://github.com/django/django/blob/b87820668e7bd519dbc05f6ee46f551858fb1d6d/django/contrib/auth/__init__.py#L111 - # Since we do not have a bullet-proof way to detect which authentication - # class is the good one, we use the first element of the list - self.user_cache.backend = settings.AUTHENTICATION_BACKENDS[0] - - return self.cleaned_data - - def get_invalid_mfa_error(self): - return forms.ValidationError( - self.error_messages['invalid_code'], - code='invalid_code', - ) - - def get_user(self): - return self.user_cache +from allauth.mfa.adapter import get_adapter +from allauth.mfa.base.forms import AuthenticateForm, ReauthenticateForm +from allauth.mfa.base.internal.flows import check_rate_limit +from allauth.mfa.models import Authenticator +from django.contrib.auth.hashers import check_password + + +class MfaAuthenticateMixin: + + def clean_code(self): + clear_rl = check_rate_limit(self.user) + code = self.cleaned_data['code'] + for auth in Authenticator.objects.filter(user=self.user).exclude( + # WebAuthn cannot validate manual codes. + type=Authenticator.Type.WEBAUTHN + ): + if auth.wrap().validate_code(code): + self.authenticator = auth + clear_rl() + return code + if auth.type == Authenticator.Type.RECOVERY_CODES: + hashed_code = self.validate_migrated_codes(code, auth.wrap()) + if hashed_code is not None: + self.authenticator = auth + clear_rl() + return code + + raise get_adapter().validation_error('incorrect_code') + + def validate_migrated_codes(self, input_code, recovery_codes): + codes = recovery_codes._get_migrated_codes() + if codes is None: + return + if not codes[0].startswith('pbkdf2_sha256$'): + return + # if codes are sha256 hashes do the recovery codes logic + for idx, hashed_code in enumerate(codes): + if check_password(input_code, hashed_code): + recovery_codes.validate_code(hashed_code) + return hashed_code + + +class MfaAuthenticateForm(MfaAuthenticateMixin, AuthenticateForm): + pass + + +class MfaReauthenticateForm(MfaAuthenticateMixin, ReauthenticateForm): + pass diff --git a/kobo/apps/accounts/mfa/migrations/0001_squashed_0004_alter_mfamethod_date_created_and_more.py b/kobo/apps/accounts/mfa/migrations/0001_squashed_0004_alter_mfamethod_date_created_and_more.py new file mode 100644 index 0000000000..ad1fc41950 --- /dev/null +++ b/kobo/apps/accounts/mfa/migrations/0001_squashed_0004_alter_mfamethod_date_created_and_more.py @@ -0,0 +1,86 @@ +# Generated by Django 4.2.24 on 2025-10-15 17:34 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + +import kpi.models.abstract_models + + +class Migration(migrations.Migration): + + replaces = [ + ('accounts_mfa', '0001_initial'), + ('accounts_mfa', '0002_add_mfa_available_to_user_model'), + ('accounts_mfa', '0003_rename_kobo_mfa_method_model'), + ('accounts_mfa', '0004_alter_mfamethod_date_created_and_more'), + ] + + initial = True + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('trench', '0003_auto_20190213_2330'), + ] + + operations = [ + migrations.CreateModel( + name='MfaAvailableToUser', + fields=[ + ( + 'id', + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name='ID', + ), + ), + ( + 'user', + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + 'verbose_name': 'per-user availability', + 'verbose_name_plural': 'per-user availabilities', + }, + ), + migrations.CreateModel( + name='MfaMethod', + fields=[ + ( + 'mfamethod_ptr', + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to='trench.mfamethod', + ), + ), + ( + 'date_created', + models.DateTimeField( + default=kpi.models.abstract_models._get_default_datetime + ), + ), + ( + 'date_modified', + models.DateTimeField( + default=kpi.models.abstract_models._get_default_datetime + ), + ), + ('date_disabled', models.DateTimeField(null=True)), + ], + options={ + 'verbose_name': 'MFA Method', + 'verbose_name_plural': 'MFA Methods', + }, + bases=('trench.mfamethod',), + ), + ] diff --git a/kobo/apps/accounts/mfa/migrations/0002_add_mfa_available_to_user_model.py b/kobo/apps/accounts/mfa/migrations/0002_add_mfa_available_to_user_model.py index 3e19a59ed0..e3e23d0742 100644 --- a/kobo/apps/accounts/mfa/migrations/0002_add_mfa_available_to_user_model.py +++ b/kobo/apps/accounts/mfa/migrations/0002_add_mfa_available_to_user_model.py @@ -1,15 +1,15 @@ # Generated by Django 2.2.27 on 2022-07-05 18:51 +import django.db.models.deletion from django.conf import settings from django.db import migrations, models -import django.db.models.deletion class Migration(migrations.Migration): dependencies = [ migrations.swappable_dependency(settings.AUTH_USER_MODEL), - ('mfa', '0001_initial'), + ('accounts_mfa', '0001_initial'), ] operations = [ diff --git a/kobo/apps/accounts/mfa/migrations/0003_rename_kobo_mfa_method_model.py b/kobo/apps/accounts/mfa/migrations/0003_rename_kobo_mfa_method_model.py index 4d619dbe19..dbe7e2dbec 100644 --- a/kobo/apps/accounts/mfa/migrations/0003_rename_kobo_mfa_method_model.py +++ b/kobo/apps/accounts/mfa/migrations/0003_rename_kobo_mfa_method_model.py @@ -9,7 +9,7 @@ class Migration(migrations.Migration): dependencies = [ migrations.swappable_dependency(settings.AUTH_USER_MODEL), ('trench', '0003_auto_20190213_2330'), - ('mfa', '0002_add_mfa_available_to_user_model'), + ('accounts_mfa', '0002_add_mfa_available_to_user_model'), ] operations = [ diff --git a/kobo/apps/accounts/mfa/migrations/0004_alter_mfamethod_date_created_and_more.py b/kobo/apps/accounts/mfa/migrations/0004_alter_mfamethod_date_created_and_more.py index e1f37b436a..9b5c36d646 100644 --- a/kobo/apps/accounts/mfa/migrations/0004_alter_mfamethod_date_created_and_more.py +++ b/kobo/apps/accounts/mfa/migrations/0004_alter_mfamethod_date_created_and_more.py @@ -1,13 +1,14 @@ # Generated by Django 4.2.11 on 2024-07-24 07:16 from django.db import migrations, models + import kpi.models.abstract_models class Migration(migrations.Migration): dependencies = [ - ('mfa', '0003_rename_kobo_mfa_method_model'), + ('accounts_mfa', '0003_rename_kobo_mfa_method_model'), ] operations = [ diff --git a/kobo/apps/accounts/mfa/migrations/0005_rename_mfa_tables.py b/kobo/apps/accounts/mfa/migrations/0005_rename_mfa_tables.py new file mode 100644 index 0000000000..1f2cd315ea --- /dev/null +++ b/kobo/apps/accounts/mfa/migrations/0005_rename_mfa_tables.py @@ -0,0 +1,30 @@ +from django.conf import settings +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('mfa', '0003_authenticator_type_uniq'), + ('accounts_mfa', '0001_squashed_0004_alter_mfamethod_date_created_and_more'), + ] + + operations = [ + migrations.RunSQL( + sql=""" + ALTER TABLE IF EXISTS mfa_mfaavailabletouser + RENAME TO accounts_mfa_mfaavailabletouser; + """, + reverse_sql=""" + ALTER TABLE accounts_mfa_mfaavailabletouser + RENAME TO mfa_mfaavailabletouser;""", + ), + migrations.RunSQL( + sql=""" + ALTER TABLE IF EXISTS mfa_mfamethod + RENAME TO accounts_mfa_mfamethod;""", + reverse_sql=""" + ALTER TABLE accounts_mfa_mfamethod RENAME TO mfa_mfamethod;""", + ), + ] diff --git a/kobo/apps/accounts/mfa/migrations/0006_add_mfa_methods_wrapper_model.py b/kobo/apps/accounts/mfa/migrations/0006_add_mfa_methods_wrapper_model.py new file mode 100644 index 0000000000..a7c4580d22 --- /dev/null +++ b/kobo/apps/accounts/mfa/migrations/0006_add_mfa_methods_wrapper_model.py @@ -0,0 +1,82 @@ +# Generated by Django 4.2.24 on 2025-10-21 00:04 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + +import kpi.models.abstract_models + + +class Migration(migrations.Migration): + + dependencies = [ + ('accounts_mfa', '0005_rename_mfa_tables'), + ] + + operations = [ + migrations.CreateModel( + name='MfaMethodsWrapper', + fields=[ + ( + 'id', + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name='ID', + ), + ), + ( + 'date_created', + models.DateTimeField( + default=kpi.models.abstract_models._get_default_datetime + ), + ), + ( + 'date_modified', + models.DateTimeField( + default=kpi.models.abstract_models._get_default_datetime + ), + ), + ('name', models.CharField(max_length=255)), + ('is_active', models.BooleanField(default=False)), + ('date_disabled', models.DateTimeField(null=True)), + ( + 'recovery_codes', + models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name='+', + to='mfa.authenticator', + ), + ), + ('secret', models.CharField(max_length=255)), + ( + 'totp', + models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name='+', + to='mfa.authenticator', + ), + ), + ( + 'user', + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + 'verbose_name': 'MFA Method', + 'verbose_name_plural': 'MFA Methods', + }, + ), + migrations.AddConstraint( + model_name='mfamethodswrapper', + constraint=models.UniqueConstraint( + fields=('user', 'name'), name='unique_user_method_name' + ), + ), + ] diff --git a/kobo/apps/accounts/mfa/models.py b/kobo/apps/accounts/mfa/models.py index 51796f765b..0b965d6b56 100644 --- a/kobo/apps/accounts/mfa/models.py +++ b/kobo/apps/accounts/mfa/models.py @@ -1,4 +1,5 @@ # coding: utf-8 +from allauth.mfa.models import Authenticator from django.conf import settings from django.contrib import admin from django.db import models @@ -15,6 +16,7 @@ class MfaAvailableToUser(models.Model): class Meta: verbose_name = 'per-user availability' verbose_name_plural = 'per-user availabilities' + user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE) def __str__(self): @@ -32,14 +34,75 @@ class MfaAvailableToUserAdmin(admin.ModelAdmin): # list_display = ('user',) -class MfaMethod(TrenchMFAMethod, AbstractTimeStampedModel): +class MfaMethodsWrapper(AbstractTimeStampedModel): """ - Extend DjangoTrench model to add created, modified and last disabled date + MFA Methods is a wrapper table that contains references to a TOTP secret + and recovery codes. """ class Meta: verbose_name = 'MFA Method' verbose_name_plural = 'MFA Methods' + constraints = ( + models.UniqueConstraint( + fields=('user', 'name'), + name='unique_user_method_name', + ), + ) + + name = models.CharField(max_length=255) + user = models.ForeignKey( + settings.AUTH_USER_MODEL, + on_delete=models.CASCADE, + related_name='mfa_methods_wrapper', + ) + secret = models.CharField(max_length=255) # Leave room for encryption + totp = models.ForeignKey( + Authenticator, null=True, on_delete=models.SET_NULL, related_name='+' + ) + recovery_codes = models.ForeignKey( + Authenticator, null=True, on_delete=models.SET_NULL, related_name='+' + ) + is_active = models.BooleanField(default=False) + date_disabled = models.DateTimeField(null=True) + + def __str__(self): + return f'{self.user.username}: {self.name=} {self.is_active=}' + + def save( + self, + force_insert=False, + force_update=False, + using=None, + update_fields=None, + ): + self.pk is None + + if not self.is_active and not self.date_disabled: + self.date_disabled = now() + + if self.is_active and self.date_disabled: + self.date_disabled = None + + if update_fields: + update_fields += ['date_disabled'] + + super().save( + force_insert=force_insert, + force_update=force_update, + using=using, + update_fields=update_fields, + ) + + +class MfaMethod(TrenchMFAMethod, AbstractTimeStampedModel): + """ + Extend DjangoTrench model to add created, modified and last disabled date + """ + + class Meta: + verbose_name = 'Trench MFA Method' + verbose_name_plural = 'Trench MFA Methods' date_disabled = models.DateTimeField(null=True) @@ -84,7 +147,7 @@ def delete(self, using=None, keep_parents=False): UserProfile.set_mfa_status(user_id=user_id, is_active=False) -class MfaMethodAdmin(TrenchMFAMethodAdmin): +class ExtendedTrenchMfaMethodAdmin(TrenchMFAMethodAdmin): search_fields = ('user__username',) autocomplete_fields = ['user'] diff --git a/kobo/apps/accounts/mfa/serializers.py b/kobo/apps/accounts/mfa/serializers.py index 5d236d4ff2..c2507abd6e 100644 --- a/kobo/apps/accounts/mfa/serializers.py +++ b/kobo/apps/accounts/mfa/serializers.py @@ -1,6 +1,8 @@ -# coding: utf-8 +from allauth.mfa.totp.internal.auth import TOTP from rest_framework import serializers -from trench.utils import get_mfa_model +from rest_framework.exceptions import NotFound, ValidationError + +from .models import MfaMethodsWrapper class UserMfaMethodSerializer(serializers.ModelSerializer): @@ -9,12 +11,37 @@ class UserMfaMethodSerializer(serializers.ModelSerializer): """ class Meta: - model = get_mfa_model() + model = MfaMethodsWrapper fields = ( 'name', - 'is_primary', 'is_active', 'date_created', 'date_modified', 'date_disabled', ) + + +class TOTPCodeSerializer(serializers.Serializer): + """ + Intended to be used for validating TOTP codes + """ + + code = serializers.CharField() + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.user = self.context['user'] + self.method = self.context['method'] + try: + self.mfamethods = MfaMethodsWrapper.objects.get( + user=self.user, is_active=True, name=self.method + ) + except MfaMethodsWrapper.DoesNotExist: + raise NotFound + self.totp = TOTP(self.mfamethods.totp) + + def validate_code(self, value): + if not self.totp.validate_code(value): + raise ValidationError('Invalid TOTP code') + + return value diff --git a/kobo/apps/accounts/mfa/signals.py b/kobo/apps/accounts/mfa/signals.py index 398fd81e2f..f404e92f13 100644 --- a/kobo/apps/accounts/mfa/signals.py +++ b/kobo/apps/accounts/mfa/signals.py @@ -2,7 +2,7 @@ from django.db.models.signals import pre_delete from django.dispatch import receiver -from .models import MfaAvailableToUser, MfaMethod +from .models import MfaAvailableToUser, MfaMethodsWrapper @receiver(pre_delete, sender=MfaAvailableToUser) @@ -20,7 +20,7 @@ def deactivate_mfa_method_for_user(**kwargs): # Use `.get()` + `.save()` (from model `MfaMethod`) instead of # `.update()` to run some logic inside `.save()`. It makes an extra # query to DB but avoid duplicated code. - mfa_method = MfaMethod.objects.get(user=mfa_available_to_user.user) + mfa_method = MfaMethodsWrapper.objects.get(user=mfa_available_to_user.user) except MfaMethod.DoesNotExist: pass else: diff --git a/kobo/apps/accounts/mfa/templates/mfa/authenticate.html b/kobo/apps/accounts/mfa/templates/mfa/authenticate.html new file mode 100644 index 0000000000..d0c2471a35 --- /dev/null +++ b/kobo/apps/accounts/mfa/templates/mfa/authenticate.html @@ -0,0 +1,57 @@ +{% extends "account/base.html" %} {% load static %} {% load i18n %} {% block content %} + +
+{% endblock %} {% block extra_javascript %} + +{% endblock %} diff --git a/kobo/apps/accounts/mfa/templates/mfa_token.html b/kobo/apps/accounts/mfa/templates/mfa_token.html deleted file mode 100644 index bc83364680..0000000000 --- a/kobo/apps/accounts/mfa/templates/mfa_token.html +++ /dev/null @@ -1,62 +0,0 @@ -{% extends "account/base.html" %} -{% load static %} -{% load i18n %} -{% block content %} - - -{% endblock %} -{% block extra_javascript %} - -{% endblock %} diff --git a/kobo/apps/accounts/mfa/tests/test_api.py b/kobo/apps/accounts/mfa/tests/test_api.py index 281c879feb..ff20f0eb04 100644 --- a/kobo/apps/accounts/mfa/tests/test_api.py +++ b/kobo/apps/accounts/mfa/tests/test_api.py @@ -1,13 +1,13 @@ -# coding: utf-8 from constance.test import override_config from django.urls import reverse from rest_framework import status -from trench.settings import trench_settings -from trench.utils import get_mfa_model from kobo.apps.kobo_auth.shortcuts import User from kpi.tests.kpi_test_case import BaseTestCase -from ..models import MfaAvailableToUser +from ..models import MfaAvailableToUser, MfaMethodsWrapper +from .utils import activate_mfa_for_user, get_mfa_code_for_user + +METHOD = 'app' class MfaApiTestCase(BaseTestCase): @@ -15,24 +15,17 @@ class MfaApiTestCase(BaseTestCase): fixtures = ['test_data'] """ - The purpose of this class is only to cover what the `mfa` app extends from - `django-trench`. (i.e.: displaying dates) - For the MFA API functionalities, see `django-trench` tests. + The purpose of this class is to cover the MFA API actions """ def setUp(self): self.someuser = User.objects.get(username='someuser') # Activate MFA for someuser - get_mfa_model().objects.create( - user=self.someuser, - secret='dummy_mfa_secret', - name='app', - is_primary=True, - is_active=True, - _backup_codes='dummy_encoded_codes', - ) - self.client.login(username='someuser', password='someuser') + activate_mfa_for_user(self.client, self.someuser) + + # Log in + self.client.force_login(self.someuser) def test_user_methods_with_date(self): @@ -50,37 +43,26 @@ def test_user_methods_with_date(self): @override_config(MFA_ENABLED=True) def test_mfa_activation_always_creates_new_secret(self): self.client.login(username='anotheruser', password='anotheruser') - mfa_methods = trench_settings.MFA_METHODS.keys() - for method in mfa_methods: - first_response = self.client.post( - reverse('mfa-activate', args=(method,)) - ) - first_secret = ( - get_mfa_model() - .objects.get(user__username='anotheruser', name=method) - .secret - ) - second_response = self.client.post( - reverse('mfa-activate', args=(method,)) - ) - second_secret = ( - get_mfa_model() - .objects.get(user__username='anotheruser', name=method) - .secret - ) - assert first_secret != second_secret - assert first_response.json() != second_response.json() + first_response = self.client.post(reverse('mfa-activate', args=(METHOD,))) + first_secret = MfaMethodsWrapper.objects.get( + user__username='anotheruser', name=METHOD + ).secret + + # Since it was never confirmed, it will generate another secret + second_response = self.client.post(reverse('mfa-activate', args=(METHOD,))) + second_secret = MfaMethodsWrapper.objects.get( + user__username='anotheruser', name=METHOD + ).secret + assert first_secret != second_secret + assert first_response.json() != second_response.json() @override_config(MFA_ENABLED=True) def test_mfa_whitelisting(self): - method = list(trench_settings.MFA_METHODS.keys())[0] anotheruser = User.objects.get(username='anotheruser') self.client.login(username='anotheruser', password='anotheruser') - # Test when whitelist is disabled - activate_response = self.client.post( - reverse('mfa-activate', args=(method,)) - ) + # Test when whitelist is disabled + activate_response = self.client.post(reverse('mfa-activate', args=(METHOD,))) assert activate_response.status_code == status.HTTP_200_OK # Enable the MFA whitelist by adding a user @@ -88,17 +70,65 @@ def test_mfa_whitelisting(self): user=self.someuser ) - activate_response = self.client.post( - reverse('mfa-activate', args=(method,)) - ) + activate_response = self.client.post(reverse('mfa-activate', args=(METHOD,))) assert activate_response.status_code == status.HTTP_403_FORBIDDEN mfa_availability = MfaAvailableToUser.objects.create(user=anotheruser) - activate_response = self.client.post( - reverse('mfa-activate', args=(method,)) - ) + activate_response = self.client.post(reverse('mfa-activate', args=(METHOD,))) assert activate_response.status_code == status.HTTP_200_OK # Reset MFA whitelist state mfa_availability.delete() someuser_mfa_activation.delete() + + def test_regenerate_codes(self): + response = self.client.post( + reverse('mfa-regenerate', args=(METHOD,)), data={'code': '1234567890'} + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + code = get_mfa_code_for_user(self.someuser) + response = self.client.post( + reverse('mfa-regenerate', args=(METHOD,)), data={'code': code} + ) + + assert len(response.data['backup_codes']) == 5 + + def test_deactivate(self): + response = self.client.post( + reverse('mfa-deactivate', args=(METHOD,)), data={'code': '1234567890'} + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + code = get_mfa_code_for_user(self.someuser) + response = self.client.post( + reverse('mfa-deactivate', args=(METHOD,)), data={'code': code} + ) + + assert response.status_code == status.HTTP_200_OK + mfamethods = MfaMethodsWrapper.objects.get(user=self.someuser) + assert mfamethods.is_active is False + + def test_reactivate_generates_different_secret(self): + """ + Test that MFA can be deactivated and reactivated/confirmed with different secret + """ + first_secret = MfaMethodsWrapper.objects.get( + user=self.someuser, name=METHOD + ).secret + code = get_mfa_code_for_user(self.someuser) + response = self.client.post( + reverse('mfa-deactivate', args=(METHOD,)), data={'code': code} + ) + assert response.status_code == status.HTTP_200_OK + + self.client.post(reverse('mfa-activate', kwargs={'method': METHOD})) + code = get_mfa_code_for_user(self.someuser) + confirm_response = self.client.post( + reverse('mfa-confirm', kwargs={'method': METHOD}), data={'code': str(code)} + ) + assert confirm_response.status_code == status.HTTP_200_OK + second_secret = MfaMethodsWrapper.objects.get( + user=self.someuser, name=METHOD + ).secret + assert first_secret != second_secret diff --git a/kobo/apps/accounts/mfa/tests/test_dates.py b/kobo/apps/accounts/mfa/tests/test_dates.py index 803f48cfa1..8f7b9e764d 100644 --- a/kobo/apps/accounts/mfa/tests/test_dates.py +++ b/kobo/apps/accounts/mfa/tests/test_dates.py @@ -1,7 +1,6 @@ -# coding: utf-8 from django.utils.timezone import now -from trench.utils import get_mfa_model +from kobo.apps.accounts.mfa.models import MfaMethodsWrapper from kobo.apps.kobo_auth.shortcuts import User from kpi.tests.kpi_test_case import BaseTestCase @@ -15,13 +14,11 @@ def setUp(self): def test_date_disabled_is_none_when_is_active(self): - mfa_method = get_mfa_model().objects.create( + mfa_method = MfaMethodsWrapper.objects.create( user=self.someuser, secret='dummy_mfa_secret', name='app', - is_primary=True, is_active=True, - _backup_codes='dummy_encoded_codes', ) self.assertEqual(mfa_method.date_disabled, None) mfa_method.date_disabled = now() @@ -30,12 +27,11 @@ def test_date_disabled_is_none_when_is_active(self): def test_date_disabled_is_set_when_not_active(self): - mfa_method = get_mfa_model().objects.create( + mfa_method = MfaMethodsWrapper.objects.create( user=self.someuser, secret='dummy_mfa_secret', name='app', is_active=False, - _backup_codes='dummy_encoded_codes', ) self.assertNotEqual(mfa_method.date_disabled, None) mfa_method.date_disabled = None @@ -44,13 +40,11 @@ def test_date_disabled_is_set_when_not_active(self): def test_date_modified(self): - mfa_method = get_mfa_model().objects.create( + mfa_method = MfaMethodsWrapper.objects.create( user=self.someuser, secret='dummy_mfa_secret', name='app', - is_primary=True, is_active=True, - _backup_codes='dummy_encoded_codes', ) date_modified = mfa_method.date_modified mfa_method.save() diff --git a/kobo/apps/accounts/mfa/tests/test_login.py b/kobo/apps/accounts/mfa/tests/test_login.py index 21651dac16..57a488c68f 100644 --- a/kobo/apps/accounts/mfa/tests/test_login.py +++ b/kobo/apps/accounts/mfa/tests/test_login.py @@ -1,13 +1,15 @@ -# coding: utf-8 from allauth.account.models import EmailAddress +from constance.test import override_config from django.conf import settings from django.shortcuts import resolve_url from django.urls import reverse from rest_framework import status -from trench.utils import get_mfa_model from kobo.apps.kobo_auth.shortcuts import User from kpi.tests.kpi_test_case import KpiTestCase +from .utils import activate_mfa_for_user + +METHOD = 'app' class LoginTests(KpiTestCase): @@ -27,14 +29,7 @@ def setUp(self): email_address.save() # Activate MFA for someuser - get_mfa_model().objects.create( - user=self.someuser, - secret='dummy_mfa_secret', - name='app', - is_primary=True, - is_active=True, - _backup_codes='dummy_encoded_codes', - ) + activate_mfa_for_user(self.client, self.someuser) # Ensure `self.client` is not authenticated self.client.logout() @@ -48,7 +43,16 @@ def test_login_with_mfa_enabled(self): 'password': 'someuser', } response = self.client.post(reverse('kobo_login'), data=data) - self.assertContains(response, 'verification token') + self.assertRedirects(response, reverse('mfa_authenticate')) + + @override_config(MFA_ENABLED=False) + def test_mfa_globally_disabled(self): + data = { + 'login': 'someuser', + 'password': 'someuser', + } + response = self.client.post(reverse('kobo_login'), data=data) + self.assertRedirects(response, reverse(settings.LOGIN_REDIRECT_URL)) def test_login_with_mfa_disabled(self): """ @@ -59,9 +63,7 @@ def test_login_with_mfa_disabled(self): 'login': 'anotheruser', 'password': 'anotheruser', } - response = self.client.post( - reverse('kobo_login'), data=data, follow=True - ) + response = self.client.post(reverse('kobo_login'), data=data, follow=True) self.assertEqual(len(response.redirect_chain), 1) redirection, status_code = response.redirect_chain[0] self.assertEqual(status_code, status.HTTP_302_FOUND) diff --git a/kobo/apps/accounts/mfa/tests/test_migration.py b/kobo/apps/accounts/mfa/tests/test_migration.py new file mode 100644 index 0000000000..c67ddf6a14 --- /dev/null +++ b/kobo/apps/accounts/mfa/tests/test_migration.py @@ -0,0 +1,71 @@ +from allauth.account.models import EmailAddress +from allauth.mfa.adapter import get_adapter +from django.conf import settings +from django.urls import reverse +from rest_framework import status +from trench.command.replace_mfa_method_backup_codes import ( + regenerate_backup_codes_for_mfa_method_command, +) +from trench.utils import get_mfa_model + +from kobo.apps.kobo_auth.shortcuts import User +from kpi.tests.kpi_test_case import BaseTestCase +from .utils import get_mfa_code_for_user + + +class MfaMigrationTestCase(BaseTestCase): + """ + Test the migration scripts + """ + + fixtures = ['test_data'] + + def setUp(self): + self.someuser = User.objects.get(username='someuser') + # Confirm someuser's e-mail address as primary and verified + email_address, _ = EmailAddress.objects.get_or_create(user=self.someuser) + email_address.primary = True + email_address.verified = True + email_address.save() + + def test_migrate_trench_data(self): + # Activate Trench MFA for someuser + get_mfa_model().objects.create( + user=self.someuser, + secret='CPALQQLP4JVV6HZOCPKVARERTFRUULN5', + name='app', + is_primary=True, + is_active=True, + ) + backup_codes = list( + regenerate_backup_codes_for_mfa_method_command(self.someuser.id, 'app') + ) + + # Migrate to allauth MFA + adapter = get_adapter() + adapter.migrate_user(self.someuser) + login_data = { + 'login': 'someuser', + 'password': 'someuser', + } + + # Test multiple cases + valid_code = get_mfa_code_for_user(self.someuser) + for code, should_pass_through in [ + (valid_code, True), # TOTP code + (backup_codes[0], True), # Backup + (backup_codes[0], False), # Expired code + (backup_codes[1], True), + (backup_codes[1], False), + (backup_codes[2], True), + ('000111', False), # Invalid code + ('111111', False), # Invalid code + ]: + self.client.post(reverse('kobo_login'), data=login_data) + response = self.client.post(reverse('mfa_authenticate'), {'code': code}) + if should_pass_through: + self.assertRedirects(response, reverse(settings.LOGIN_REDIRECT_URL)) + else: + assert response.status_code == status.HTTP_200_OK + assert 'Incorrect code' in response.content.decode() + self.client.logout() diff --git a/kobo/apps/accounts/mfa/tests/test_signal.py b/kobo/apps/accounts/mfa/tests/test_signal.py index 1c27a8f185..b67b6395cb 100644 --- a/kobo/apps/accounts/mfa/tests/test_signal.py +++ b/kobo/apps/accounts/mfa/tests/test_signal.py @@ -1,7 +1,4 @@ -# coding: utf-8 -from trench.utils import get_mfa_model - -from kobo.apps.accounts.mfa.models import MfaAvailableToUser +from kobo.apps.accounts.mfa.models import MfaAvailableToUser, MfaMethodsWrapper from kobo.apps.kobo_auth.shortcuts import User from kpi.tests.kpi_test_case import BaseTestCase @@ -15,13 +12,11 @@ def setUp(self): def test_mfa_method_disabled_on_per_user_activation_deletion(self): - mfa_method = get_mfa_model().objects.create( + mfa_method = MfaMethodsWrapper.objects.create( user=self.someuser, secret='dummy_mfa_secret', name='app', - is_primary=True, is_active=True, - _backup_codes='dummy_encoded_codes', ) self.assertEqual(mfa_method.date_disabled, None) self.assertEqual(mfa_method.is_active, True) diff --git a/kobo/apps/accounts/mfa/tests/utils.py b/kobo/apps/accounts/mfa/tests/utils.py new file mode 100644 index 0000000000..d690a383b1 --- /dev/null +++ b/kobo/apps/accounts/mfa/tests/utils.py @@ -0,0 +1,24 @@ +import pyotp +from allauth.mfa.adapter import get_adapter +from django.urls import reverse + +from ..models import MfaMethodsWrapper + + +def get_mfa_code_for_user(user): + mfa_method = MfaMethodsWrapper.objects.get(user=user, name='app') + adapter = get_adapter() + secret = adapter.decrypt(mfa_method.secret) + totp = pyotp.TOTP(secret) + code = totp.now() + return code + + +def activate_mfa_for_user(client, user): + client.force_login(user) + client.post(reverse('mfa-activate', kwargs={'method': 'app'})) + code = get_mfa_code_for_user(user) + client.post( + reverse('mfa-confirm', kwargs={'method': 'app'}), data={'code': str(code)} + ) + client.logout() diff --git a/kobo/apps/accounts/mfa/urls.py b/kobo/apps/accounts/mfa/urls.py index 5a69e8a39d..4723cb6e5e 100644 --- a/kobo/apps/accounts/mfa/urls.py +++ b/kobo/apps/accounts/mfa/urls.py @@ -1,15 +1,16 @@ # coding: utf-8 -from django.urls import include, path +from django.urls import path from .views import ( MfaListUserMethodsView, MfaLoginView, - MfaTokenView, MfaMethodActivationView, + MfaMethodConfirmView, + MfaMethodDeactivateView, + MfaMethodRegenerateCodesView, ) urlpatterns = [ - path('accounts/login/mfa/', MfaTokenView.as_view(), name='mfa_token'), path('accounts/login/', MfaLoginView.as_view(), name='kobo_login'), path( 'api/v2/auth/mfa/user-methods/', @@ -21,5 +22,19 @@ MfaMethodActivationView.as_view(), name='mfa-activate', ), - path('api/v2/auth/', include('trench.urls')), + path( + 'api/v2/auth/