Skip to content

Commit

Permalink
Use the async auth api methods if they exist
Browse files Browse the repository at this point in the history
  • Loading branch information
bigfootjon committed Apr 15, 2024
1 parent 42bf4a3 commit b71a5ad
Showing 1 changed file with 228 additions and 110 deletions.
338 changes: 228 additions & 110 deletions channels/auth.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import django
from django.conf import settings
from django.contrib.auth import (
BACKEND_SESSION_KEY,
Expand All @@ -16,127 +17,244 @@
from channels.middleware import BaseMiddleware
from channels.sessions import CookieMiddleware, SessionMiddleware

if django.VERSION >= (5, 1):

@database_sync_to_async
def get_user(scope):
"""
Return the user model instance associated with the given scope.
If no user is retrieved, return an instance of `AnonymousUser`.
"""
# postpone model import to avoid ImproperlyConfigured error before Django
# setup is complete.
from django.contrib.auth.models import AnonymousUser

if "session" not in scope:
raise ValueError(
"Cannot find session in scope. You should wrap your consumer in "
"SessionMiddleware."
)
session = scope["session"]
user = None
try:
user_id = _get_user_session_key(session)
backend_path = session[BACKEND_SESSION_KEY]
except KeyError:
pass
else:
if backend_path in settings.AUTHENTICATION_BACKENDS:
backend = load_backend(backend_path)
user = backend.get_user(user_id)
# Verify the session
if hasattr(user, "get_session_auth_hash"):
session_hash = session.get(HASH_SESSION_KEY)
session_hash_verified = session_hash and constant_time_compare(
session_hash, user.get_session_auth_hash()
async def get_user(scope):
"""
Return the user model instance associated with the given scope.
If no user is retrieved, return an instance of `AnonymousUser`.
"""
# postpone model import to avoid ImproperlyConfigured error before Django
# setup is complete.
from django.contrib.auth.models import AnonymousUser

if "session" not in scope:
raise ValueError(
"Cannot find session in scope. You should wrap your consumer in "
"SessionMiddleware."
)
session = scope["session"]
user = None
try:
user_id = _get_user_session_key(session)
backend_path = await session.aget(BACKEND_SESSION_KEY)
except KeyError:
pass
else:
if backend_path in settings.AUTHENTICATION_BACKENDS:
backend = load_backend(backend_path)
user = await backend.aget_user(user_id)
# Verify the session
if hasattr(user, "get_session_auth_hash"):
session_hash = await session.aget(HASH_SESSION_KEY)
session_hash_verified = session_hash and constant_time_compare(
session_hash, user.get_session_auth_hash()
)
if not session_hash_verified:
await session.aflush()
user = None
return user or AnonymousUser()

async def login(scope, user, backend=None):
"""
Persist a user id and a backend in the request.
This way a user doesn't have to re-authenticate on every request.
Note that data set during the anonymous session is retained when the user
logs in.
"""
if "session" not in scope:
raise ValueError(
"Cannot find session in scope. You should wrap your consumer in "
"SessionMiddleware."
)
session = scope["session"]
session_auth_hash = ""
if user is None:
user = scope.get("user", None)
if user is None:
raise ValueError(
"User must be passed as an argument or must be present in the scope."
)
if hasattr(user, "get_session_auth_hash"):
session_auth_hash = user.get_session_auth_hash()
if SESSION_KEY in session:
if _get_user_session_key(session) != user.pk or (
session_auth_hash
and not constant_time_compare(
await session.aget(HASH_SESSION_KEY, ""), session_auth_hash
)
):
# To avoid reusing another user's session, create a new, empty
# session if the existing session corresponds to a different
# authenticated user.
await session.aflush()
else:
await session.acycle_key()
try:
backend = backend or user.backend
except AttributeError:
backends = _get_backends(return_tuples=True)
if len(backends) == 1:
_, backend = backends[0]
else:
raise ValueError(
"You have multiple authentication backends configured and "
"therefore must provide the `backend` "
"argument or set the `backend` attribute on the user."
)
if not session_hash_verified:
session.flush()
user = None
return user or AnonymousUser()
await session.aset(SESSION_KEY, user._meta.pk.value_to_string(user))
await session.aset(BACKEND_SESSION_KEY, backend)
await session.aset(HASH_SESSION_KEY, session_auth_hash)
scope["user"] = user
# note this does not reset the CSRF_COOKIE/Token
await user_logged_in.asend(sender=user.__class__, request=None, user=user)

async def logout(scope):
"""
Remove the authenticated user's ID from the request and flush their session
data.
"""
# postpone model import to avoid ImproperlyConfigured error before Django
# setup is complete.
from django.contrib.auth.models import AnonymousUser

@database_sync_to_async
def login(scope, user, backend=None):
"""
Persist a user id and a backend in the request.
This way a user doesn't have to re-authenticate on every request.
Note that data set during the anonymous session is retained when the user
logs in.
"""
if "session" not in scope:
raise ValueError(
"Cannot find session in scope. You should wrap your consumer in "
"SessionMiddleware."
)
session = scope["session"]
session_auth_hash = ""
if user is None:
if "session" not in scope:
raise ValueError(
"Login cannot find session in scope. You should wrap your "
"consumer in SessionMiddleware."
)
session = scope["session"]
# Dispatch the signal before the user is logged out so the receivers have a
# chance to find out *who* logged out.
user = scope.get("user", None)
if user is None:
raise ValueError(
"User must be passed as an argument or must be present in the scope."
)
if hasattr(user, "get_session_auth_hash"):
session_auth_hash = user.get_session_auth_hash()
if SESSION_KEY in session:
if _get_user_session_key(session) != user.pk or (
session_auth_hash
and not constant_time_compare(
session.get(HASH_SESSION_KEY, ""), session_auth_hash
if hasattr(user, "is_authenticated") and not user.is_authenticated:
user = None
if user is not None:
await user_logged_out.asend(sender=user.__class__, request=None, user=user)
await session.aflush()
if "user" in scope:
scope["user"] = AnonymousUser()

else:

@database_sync_to_async
def get_user(scope):
"""
Return the user model instance associated with the given scope.
If no user is retrieved, return an instance of `AnonymousUser`.
"""
# postpone model import to avoid ImproperlyConfigured error before Django
# setup is complete.
from django.contrib.auth.models import AnonymousUser

if "session" not in scope:
raise ValueError(
"Cannot find session in scope. You should wrap your consumer in "
"SessionMiddleware."
)
):
# To avoid reusing another user's session, create a new, empty
# session if the existing session corresponds to a different
# authenticated user.
session.flush()
else:
session.cycle_key()
try:
backend = backend or user.backend
except AttributeError:
backends = _get_backends(return_tuples=True)
if len(backends) == 1:
_, backend = backends[0]
session = scope["session"]
user = None
try:
user_id = _get_user_session_key(session)
backend_path = session[BACKEND_SESSION_KEY]
except KeyError:
pass
else:
if backend_path in settings.AUTHENTICATION_BACKENDS:
backend = load_backend(backend_path)
user = backend.get_user(user_id)
# Verify the session
if hasattr(user, "get_session_auth_hash"):
session_hash = session.get(HASH_SESSION_KEY)
session_hash_verified = session_hash and constant_time_compare(
session_hash, user.get_session_auth_hash()
)
if not session_hash_verified:
session.flush()
user = None
return user or AnonymousUser()

@database_sync_to_async
def login(scope, user, backend=None):
"""
Persist a user id and a backend in the request.
This way a user doesn't have to re-authenticate on every request.
Note that data set during the anonymous session is retained when the user
logs in.
"""
if "session" not in scope:
raise ValueError(
"Cannot find session in scope. You should wrap your consumer in "
"SessionMiddleware."
)
session = scope["session"]
session_auth_hash = ""
if user is None:
user = scope.get("user", None)
if user is None:
raise ValueError(
"You have multiple authentication backends configured and "
"therefore must provide the `backend` "
"argument or set the `backend` attribute on the user."
"User must be passed as an argument or must be present in the scope."
)
session[SESSION_KEY] = user._meta.pk.value_to_string(user)
session[BACKEND_SESSION_KEY] = backend
session[HASH_SESSION_KEY] = session_auth_hash
scope["user"] = user
# note this does not reset the CSRF_COOKIE/Token
user_logged_in.send(sender=user.__class__, request=None, user=user)
if hasattr(user, "get_session_auth_hash"):
session_auth_hash = user.get_session_auth_hash()
if SESSION_KEY in session:
if _get_user_session_key(session) != user.pk or (
session_auth_hash
and not constant_time_compare(
session.get(HASH_SESSION_KEY, ""), session_auth_hash
)
):
# To avoid reusing another user's session, create a new, empty
# session if the existing session corresponds to a different
# authenticated user.
session.flush()
else:
session.cycle_key()
try:
backend = backend or user.backend
except AttributeError:
backends = _get_backends(return_tuples=True)
if len(backends) == 1:
_, backend = backends[0]
else:
raise ValueError(
"You have multiple authentication backends configured and "
"therefore must provide the `backend` "
"argument or set the `backend` attribute on the user."
)
session[SESSION_KEY] = user._meta.pk.value_to_string(user)
session[BACKEND_SESSION_KEY] = backend
session[HASH_SESSION_KEY] = session_auth_hash
scope["user"] = user
# note this does not reset the CSRF_COOKIE/Token
user_logged_in.send(sender=user.__class__, request=None, user=user)

@database_sync_to_async
def logout(scope):
"""
Remove the authenticated user's ID from the request and flush their session
data.
"""
# postpone model import to avoid ImproperlyConfigured error before Django
# setup is complete.
from django.contrib.auth.models import AnonymousUser

@database_sync_to_async
def logout(scope):
"""
Remove the authenticated user's ID from the request and flush their session
data.
"""
# postpone model import to avoid ImproperlyConfigured error before Django
# setup is complete.
from django.contrib.auth.models import AnonymousUser

if "session" not in scope:
raise ValueError(
"Login cannot find session in scope. You should wrap your "
"consumer in SessionMiddleware."
)
session = scope["session"]
# Dispatch the signal before the user is logged out so the receivers have a
# chance to find out *who* logged out.
user = scope.get("user", None)
if hasattr(user, "is_authenticated") and not user.is_authenticated:
user = None
if user is not None:
user_logged_out.send(sender=user.__class__, request=None, user=user)
session.flush()
if "user" in scope:
scope["user"] = AnonymousUser()
if "session" not in scope:
raise ValueError(
"Login cannot find session in scope. You should wrap your "
"consumer in SessionMiddleware."
)
session = scope["session"]
# Dispatch the signal before the user is logged out so the receivers have a
# chance to find out *who* logged out.
user = scope.get("user", None)
if hasattr(user, "is_authenticated") and not user.is_authenticated:
user = None
if user is not None:
user_logged_out.send(sender=user.__class__, request=None, user=user)
session.flush()
if "user" in scope:
scope["user"] = AnonymousUser()


def _get_user_session_key(session):
Expand Down

0 comments on commit b71a5ad

Please sign in to comment.