|
| 1 | +from typing import Optional |
| 2 | + |
| 3 | +from allauth.mfa.adapter import DefaultMFAAdapter |
| 4 | +from allauth.mfa.models import Authenticator |
| 5 | +from constance import config |
| 6 | +from django.conf import settings |
| 7 | + |
| 8 | +from ..utils import user_has_inactive_paid_subscription |
| 9 | +from .models import MfaMethod, MfaMethodsWrapper |
| 10 | +from .permissions import mfa_allowed_for_user |
| 11 | + |
| 12 | + |
| 13 | +class MfaAdapter(DefaultMFAAdapter): |
| 14 | + |
| 15 | + def is_mfa_enabled(self, user, types=None) -> bool: |
| 16 | + # NOTE: This is a temporary thing. We are migrating users to the allauth tables |
| 17 | + # When the migration is done it won't be necessary. |
| 18 | + self.migrate_user(user) |
| 19 | + mfa_active_super = super().is_mfa_enabled(user, types) |
| 20 | + mfa_active = ( |
| 21 | + mfa_active_super |
| 22 | + and MfaMethodsWrapper.objects.filter(user=user, is_active=True).first() |
| 23 | + is not None |
| 24 | + ) |
| 25 | + mfa_allowed = mfa_allowed_for_user(user) |
| 26 | + inactive_subscription = user_has_inactive_paid_subscription(user.username) |
| 27 | + return mfa_active and (mfa_allowed or inactive_subscription) |
| 28 | + |
| 29 | + def get_totp_label(self, user) -> str: |
| 30 | + """Returns the label used for representing the given user in a TOTP QR |
| 31 | + code. |
| 32 | + """ |
| 33 | + return f'{config.MFA_ISSUER_NAME}-{user.username}' |
| 34 | + |
| 35 | + def get_totp_issuer(self) -> str: |
| 36 | + """Returns the TOTP issuer name that will be contained in the TOTP QR |
| 37 | + code. |
| 38 | + """ |
| 39 | + return config.MFA_ISSUER_NAME |
| 40 | + |
| 41 | + def migrate_user( |
| 42 | + self, user: settings.AUTH_USER_MODEL, mfa_method: MfaMethod = None |
| 43 | + ) -> Optional[MfaMethodsWrapper]: |
| 44 | + """Migrate user MFA data from trench tables to allauth tables""" |
| 45 | + if not mfa_method: |
| 46 | + mfa_method = ( |
| 47 | + MfaMethod.objects.filter(name='app', user=user, is_active=True) |
| 48 | + .order_by('is_primary') |
| 49 | + .first() |
| 50 | + ) |
| 51 | + if not mfa_method: |
| 52 | + return |
| 53 | + authenticators = Authenticator.objects.filter(user_id=user.id) |
| 54 | + types_ok = {a.type for a in authenticators} == { |
| 55 | + Authenticator.Type.TOTP, |
| 56 | + Authenticator.Type.RECOVERY_CODES, |
| 57 | + } |
| 58 | + # If allauth MFA Authenticators already exist, exit |
| 59 | + if types_ok: |
| 60 | + return |
| 61 | + for authenticator in authenticators: |
| 62 | + authenticator.delete() |
| 63 | + |
| 64 | + encrypted_secret = self.encrypt(mfa_method.secret) |
| 65 | + totp_authenticator = Authenticator.objects.create( |
| 66 | + user_id=mfa_method.user_id, |
| 67 | + type=Authenticator.Type.TOTP, |
| 68 | + data={'secret': encrypted_secret}, |
| 69 | + ) |
| 70 | + recovery_codes = Authenticator.objects.create( |
| 71 | + user_id=mfa_method.user_id, |
| 72 | + type=Authenticator.Type.RECOVERY_CODES, |
| 73 | + data={ |
| 74 | + 'migrated_codes': [self.encrypt(c) for c in mfa_method.backup_codes], |
| 75 | + 'used_mask': 0, |
| 76 | + }, |
| 77 | + ) |
| 78 | + mfa_method_wrapper = MfaMethodsWrapper.objects.create( |
| 79 | + name='app', |
| 80 | + user=user, |
| 81 | + is_active=True, |
| 82 | + totp=totp_authenticator, |
| 83 | + recovery_codes=recovery_codes, |
| 84 | + secret=encrypted_secret, |
| 85 | + ) |
| 86 | + return mfa_method_wrapper |
0 commit comments