diff --git a/.env.sample b/.env.sample
index 651fd88b8..318c9a99d 100644
--- a/.env.sample
+++ b/.env.sample
@@ -8,21 +8,23 @@ DB_HOST="localhost"
DB_PORT=5432
MYSQL_DRIVER=
DB_URL=postgresql://username:password@localhost:5432/test
-SECRET_KEY = ""
-ALGORITHM = HS256
-ACCESS_TOKEN_EXPIRE_MINUTES = 3000
+SECRET_KEY=""
+ALGORITHM=HS256
+ACCESS_TOKEN_EXPIRE_MINUTES=3000
JWT_REFRESH_EXPIRY=7
APP_URL=
GOOGLE_CLIENT_ID=""
GOOGLE_CLIENT_SECRET=""
-FRONTEND_URL='http://127.0.0.1:3000/login-success'
+FRONTEND_URL='https://python-fastapi.boilerplate.hng.tech'
+
+SERVER_PORT_NUMBER=7001
TESTING=''
-MAIL_USERNAME=""
-MAIL_PASSWORD=""
+MAIL_USERNAME="MAIL_USERNAME"
+MAIL_PASSWORD="MAIL_PASSWORD"
MAIL_FROM="dummy@gmail.com"
MAIL_PORT=465
MAIL_SERVER="smtp.gmail.com"
@@ -40,6 +42,14 @@ STRIPE_WEBHOOK_SECRET=""
MAILJET_API_KEY='MAIL JET API KEY'
MAILJET_API_SECRET='SECRET KEY'
+
+REDIS_HOST='REDIS_HOST'
+REDIS_PORT=11305
+REDIS_PASSWORD='REDIS_PASSWORD'
+REDIS_DB=0
+
APP_NAME="fastapi_boilerplate"
+
TELEX_WEBHOOK_URL=""
+
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 660fbc2fa..0dac308ff 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -5,7 +5,6 @@ on:
types: [opened, synchronize, reopened]
paths-ignore:
- "README.md"
- # - ".github/workflows/**"
jobs:
build-and-test:
@@ -21,6 +20,15 @@ jobs:
ports:
- 5432:5432
+ redis:
+ image: redis:latest
+ ports:
+ - 6379:6379
+
+ env:
+ REDIS_PASSWORD: ${{ secrets.REDIS_PASSWORD }}
+ REDIS_HOST: ${{ secrets.REDIS_HOST }}
+
steps:
- name: Checkout code
uses: actions/checkout@v4
@@ -45,6 +53,17 @@ jobs:
- name: Copy env file
run: cp .env.sample .env
+ - name: Verify Redis secrets
+ run: |
+ echo "REDIS_HOST=${{ secrets.REDIS_HOST }}"
+ echo "REDIS_PORT=${{ secrets.REDIS_PORT }}"
+ echo "REDIS_DB=${{ secrets.REDIS_DB }}"
+ echo "REDIS_PASSWORD=${{ secrets.REDIS_PASSWORD }}"
+
+ - name: Test Redis connection
+ run: |
+ redis-cli -h $REDIS_HOST -p $REDIS_PORT ping || echo "Redis connection failed"
+
- name: Run app
run: |
python3 main.py &
@@ -65,4 +84,4 @@ jobs:
- name: Run tests
run: |
- PYTHONPATH=. pytest
\ No newline at end of file
+ PYTHONPATH=. pytest --disable-warnings --maxfail=3
diff --git a/api/core/dependencies/email/templates/email-verification.html b/api/core/dependencies/email/templates/email-verification.html
index 26621bbc0..bb561a88f 100644
--- a/api/core/dependencies/email/templates/email-verification.html
+++ b/api/core/dependencies/email/templates/email-verification.html
@@ -1,7 +1,8 @@
{% extends 'base.html' %}
{% block title %}Email Verification{% endblock %}
-{% block style %}{% endblock %}
+{% block style %}
+{% endblock %}
{% block content %}
-
Hi John Doe,
+
Hi {{first_name}}, {{last_name}}
Thanks for registering your account with us Boilerplate. Before we get
started, we just need to confirm that this is you.
@@ -23,11 +24,8 @@
Or copy this link:
-
- https://carbonated-umbra-a35.notion.site/Language-Learning-AI-game-608b687875cf4b48a9a0194ee82ae17d
+
+ {{ cta_link }}
diff --git a/api/core/dependencies/redis_cache.py b/api/core/dependencies/redis_cache.py
new file mode 100644
index 000000000..6c3fe90ce
--- /dev/null
+++ b/api/core/dependencies/redis_cache.py
@@ -0,0 +1,22 @@
+import redis
+from api.utils.settings import settings
+import logging
+
+_redis_client = None
+
+def get_redis_client():
+ global _redis_client
+ if _redis_client is None:
+ try:
+ _redis_client = redis.StrictRedis(
+ host=settings.REDIS_HOST,
+ port=settings.REDIS_PORT,
+ password=settings.REDIS_PASSWORD,
+ db=settings.REDIS_DB,
+ decode_responses=True
+ )
+ _redis_client.ping() # Test connection first
+ except (redis.ConnectionError, redis.AuthenticationError) as e:
+ logging.warning(f"Redis connection failed: {e}")
+ _redis_client = None # Reset the client
+ return _redis_client
diff --git a/api/utils/settings.py b/api/utils/settings.py
index 9b064f608..c5a8969d4 100644
--- a/api/utils/settings.py
+++ b/api/utils/settings.py
@@ -14,6 +14,16 @@ class Settings(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES: int = config("ACCESS_TOKEN_EXPIRE_MINUTES")
JWT_REFRESH_EXPIRY: int = config("JWT_REFRESH_EXPIRY")
+ # Redis configurations
+ REDIS_HOST: str = config("REDIS_HOST")
+ REDIS_DB: int = config("REDIS_DB", cast=int)
+ REDIS_PORT: int = config("REDIS_PORT", cast=int)
+ REDIS_PASSWORD: str = config("REDIS_PASSWORD")
+
+ FRONTEND_URL: str = config("FRONTEND_URL")
+ SERVER_PORT_NUMBER: int = config("SERVER_PORT_NUMBER", cast=int)
+
+
# Database configurations
DB_HOST: str = config("DB_HOST")
DB_PORT: int = config("DB_PORT", cast=int)
diff --git a/api/utils/success_response.py b/api/utils/success_response.py
index 8d773967f..7579a44eb 100644
--- a/api/utils/success_response.py
+++ b/api/utils/success_response.py
@@ -11,6 +11,7 @@ def success_response(status_code: int, message: str, data: Optional[dict] = None
"status_code": status_code,
"message": message,
"data": data or {} # Ensure data is always a dictionary
+
}
return JSONResponse(status_code=status_code, content=jsonable_encoder(response_data))
diff --git a/api/v1/routes/auth.py b/api/v1/routes/auth.py
index 2204b8876..39c632452 100644
--- a/api/v1/routes/auth.py
+++ b/api/v1/routes/auth.py
@@ -3,7 +3,10 @@
from fastapi.responses import JSONResponse
from jose import ExpiredSignatureError, JWTError
from slowapi import Limiter
+from redis import Redis
+from api.core.dependencies.redis_cache import get_redis_client
from slowapi.util import get_remote_address
+from api.utils.settings import settings
from fastapi import (
BackgroundTasks,
@@ -19,7 +22,7 @@
from typing import Annotated
from api.core.dependencies.email_sender import send_email
-from api.utils.success_response import auth_response, success_response
+from api.utils.success_response import auth_response, success_response, fail_response
from api.utils.send_mail import send_magic_link
from api.v1.models import User
from api.v1.schemas.user import Token, UserEmailSender
@@ -31,7 +34,6 @@
UserData2,
)
from api.v1.schemas.token import TokenRequest
-
from api.v1.schemas.user import (MagicLinkRequest,
ChangePasswordSchema,
AuthMeResponse)
@@ -49,13 +51,15 @@
TOTPDeviceDataSchema,
)
from api.v1.services.totp import totp_service
-from api.utils.settings import settings
auth = APIRouter(prefix="/auth", tags=["Authentication"])
# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
+# Initialize Redis client
+redis_client = get_redis_client()
+
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -71,67 +75,133 @@ def register(
):
"""Endpoint for a user to register their account"""
- base_url = str(request.base_url).strip("/")
- # Create user account
- user = user_service.create(db=db, schema=user_schema)
+ try:
+ # Check if user already exists
+ existing_user = user_service.get_user_by_email(db, email=user_schema.email)
+ if existing_user:
+
+ return fail_response(
+ status_code=400,
+ message="User with this email already exists",
+ data={
+ 'user_email': user_schema.email
+ }
+ )
+ # Generate verification token
+ verification_token = AuthService.generate_verification_token()
+ logger.info(f"Generated Token: {verification_token}")
+
+ # Check if the user email is already cached in Redis
+ redis_key = f"pending_user:{user_schema.email}"
+ cached_user = redis_client.hgetall(redis_key)
+
+
+ if cached_user:
+ verification_token = cached_user.get('token')
+ else:
+ verification_token = AuthService.generate_verification_token()
+ logger.info(f"Generated Token Two: {verification_token}")
+ redis_client.hmset(redis_key, {
+ "email": user_schema.email,
+ "password": user_schema.password,
+ "first_name": user_schema.first_name,
+ "last_name": user_schema.last_name,
+ "token": verification_token
+ })
+ redis_client.expire(redis_key, 900)
+
+ cta_link = f'{settings.FRONTEND_URL}/verify?email={user_schema.email}&token={verification_token}'
+ background_tasks.add_task(
+ send_email,
+ recipient=user_schema.email,
+ template_name='email-verification.html',
+ subject='Verify Your Email Address',
+ context={
+ 'first_name': user_schema.first_name,
+ 'last_name': user_schema.last_name,
+ 'cta_link': cta_link
+ }
+ )
- verification_token = user_service.create_verification_token(user.id)
- verification_link = f"{base_url}/api/v1/auth/verify-email?token={verification_token}"
- access_token = user_service.create_access_token(user_id=user.id)
- refresh_token = user_service.create_refresh_token(user_id=user.id)
- cta_link = "https://anchor-python.teams.hng.tech/about-us"
+ return success_response(
+ status_code=201,
+ message=f"Verification email sent. Please check your inbox at {user_schema.email}",
+ data={
+ 'user': {
+ "email": user_schema.email,
+ 'first_name': user_schema.first_name,
+ 'last_name': user_schema.last_name,
+ 'is_superadmin': 'false'
+ }
+
+ }
+ )
+ except AttributeError as e:
+ logger.warning(f"Redis Connection failed: {e}")
+ base_url = str(request.base_url).strip("/")
+ # Create user account
+ user = user_service.create(db=db, schema=user_schema)
- # create an organization for the user
- org = CreateUpdateOrganisation(
- name=f"{user.email}'s Organisation", email=user.email
- )
- organisation_service.create(db=db, schema=org, user=user)
- user_organizations = organisation_service.retrieve_user_organizations(user, db)
- # Create access and refresh tokens
- access_token = user_service.create_access_token(user_id=user.id)
- refresh_token = user_service.create_refresh_token(user_id=user.id)
- cta_link = f"{settings.ANCHOR_PYTHON_BASE_URL}/about-us"
+ verification_token = user_service.create_verification_token(user.id)
+ verification_link = f"{base_url}/api/v1/auth/verify-email?token={verification_token}"
+ access_token = user_service.create_access_token(user_id=user.id)
+ refresh_token = user_service.create_refresh_token(user_id=user.id)
+ cta_link = "https://anchor-python.teams.hng.tech/about-us"
- # Send email in the background
- background_tasks.add_task(
- send_email,
- recipient=user.email,
- template_name="welcome.html",
- subject="Welcome to HNG Boilerplate",
- context={
- "first_name": user.first_name,
- "last_name": user.last_name,
- 'verification_link': verification_link,
- "cta_link": cta_link,
- },
- )
+ # create an organization for the user
+ org = CreateUpdateOrganisation(
+ name=f"{user.email}'s Organisation", email=user.email
+ )
+ organisation_service.create(db=db, schema=org, user=user)
+ user_organizations = organisation_service.retrieve_user_organizations(user, db)
+
+ # Create access and refresh tokens
+ access_token = user_service.create_access_token(user_id=user.id)
+ refresh_token = user_service.create_refresh_token(user_id=user.id)
+ cta_link = f"{settings.ANCHOR_PYTHON_BASE_URL}/about-us"
+
+
+ # Send email in the background
+ background_tasks.add_task(
+ send_email,
+ recipient=user.email,
+ template_name="welcome.html",
+ subject="Welcome to HNG Boilerplate",
+ context={
+ "first_name": user.first_name,
+ "last_name": user.last_name,
+ 'verification_link': verification_link,
+ "cta_link": cta_link,
+ },
+ )
- response = auth_response(
- status_code=201,
- message="User created successfully",
- access_token=access_token,
- data={
- "user": jsonable_encoder(
- user, exclude=["password", "is_deleted", "is_verified", "updated_at"]
- ),
- "organisations": user_organizations,
- },
- )
+ response = auth_response(
+ status_code=201,
+ message="User created successfully",
+ access_token=access_token,
+ data={
+ "user": jsonable_encoder(
+ user, exclude=["password", "is_deleted", "is_verified", "updated_at"]
+ ),
+ "organisations": user_organizations,
+ },
+ )
+
+ # Add refresh token to cookies
+ response.set_cookie(
+ key="refresh_token",
+ value=refresh_token,
+ expires=timedelta(days=60),
+ httponly=True,
+ secure=True,
+ samesite="none",
+ )
+ return response
- # Add refresh token to cookies
- response.set_cookie(
- key="refresh_token",
- value=refresh_token,
- expires=timedelta(days=60),
- httponly=True,
- secure=True,
- samesite="none",
- )
- return response
@@ -186,48 +256,113 @@ def resend_verification_email(request: Request, data: UserEmailSender, backgroun
@auth.post(path="/register-super-admin", status_code=status.HTTP_201_CREATED, response_model=auth_response)
-@limiter.limit("5/minute") # Limit to 5 requests per minute per IP
-def register_as_super_admin(
- request: Request, user: UserCreate, db: Session = Depends(get_db)
-):
+@limiter.limit("1000/minute") # Limit to 5 requests per minute per IP
+def register_as_super_admin(request: Request, background_tasks: BackgroundTasks, user_schema: UserCreate, db: Session = Depends(get_db)):
"""Endpoint for super admin creation"""
- user = user_service.create_admin(db=db, schema=user)
- # create an organization for the user
- org = CreateUpdateOrganisation(
- name=f"{user.email}'s Organisation", email=user.email
- )
- organisation_service.create(db=db, schema=org, user=user)
- user_organizations = organisation_service.retrieve_user_organizations(user, db)
-
- # Create access and refresh tokens
- access_token = user_service.create_access_token(user_id=user.id)
- refresh_token = user_service.create_refresh_token(user_id=user.id)
+ try:
- response = auth_response(
- status_code=201,
- message="User created successfully",
- access_token=access_token,
- data={
- "user": jsonable_encoder(
- user, exclude=["password", "is_deleted", "is_verified", "updated_at"]
- ),
- "organisations": user_organizations,
- },
- )
+ # Check if user already exists
+ existing_user = user_service.get_user_by_email(db, email=user_schema.email)
+ if existing_user:
+ return fail_response(
+ status_code=400,
+ message="User with this email already exists",
+ data={
+ 'user': {
+ 'email': user_schema.email,
+ 'first_name': user_schema.first_name,
+ 'last_name': user_schema.last_name
+ }
+ }
+ )
- # Add refresh token to cookies
- response.set_cookie(
- key="refresh_token",
- value=refresh_token,
- expires=timedelta(days=60),
- httponly=True,
- secure=True,
- samesite="none",
- )
+ # Generate verification token
+ verification_token = AuthService.generate_verification_token()
+ print(f"Generated Token: {verification_token}")
+
+ # Check if the user email is already cached in Redis
+ redis_key = f"pending_user:{user_schema.email}"
+ cached_user = redis_client.hgetall(redis_key)
+
+ if cached_user:
+ # Use the existing token if the cache hasn't expired
+ verification_token = cached_user.get('token')
+ else:
+ # Generate a new token and cache user details (15 mins expiry)
+ verification_token = AuthService.generate_verification_token()
+ redis_client.hmset(redis_key, {
+ "email": user_schema.email,
+ "password": user_schema.password,
+ "first_name": user_schema.first_name,
+ "last_name": user_schema.last_name,
+ "token": verification_token,
+ "is_superadmin": "true"
+ })
+ redis_client.expire(redis_key, 900)
+
+ # Send email verification link (reuse existing token or use new one)
+ cta_link = f'{settings.FRONTEND_URL}/verify?email={user_schema.email}&token={verification_token}'
+ background_tasks.add_task(
+ send_email,
+ recipient=user_schema.email,
+ template_name='email-verification.html',
+ subject='Verify Your Email Address',
+ context={
+ 'first_name': user_schema.first_name,
+ 'last_name': user_schema.first_name,
+ 'cta_link': cta_link
+ }
+ )
+ return success_response(
+ status_code=201,
+ message=f"Verification email sent. Please check your inbox at {user_schema.email}",
+ data={
+ 'user': {
+ "email": user_schema.email,
+ 'first_name': user_schema.first_name,
+ 'last_name': user_schema.last_name,
+ 'is_superadmin': 'true'
+ }
+ }
+ )
+ except AttributeError as e:
+ logger.warning(f"Redis Connection failed: {e}")
+ user = user_service.create_admin(db=db, schema=user_schema)
+ # create an organization for the user
+ org = CreateUpdateOrganisation(
+ name=f"{user.email}'s Organisation", email=user.email
+ )
+ organisation_service.create(db=db, schema=org, user=user)
+ user_organizations = organisation_service.retrieve_user_organizations(user, db)
+
+ # Create access and refresh tokens
+ access_token = user_service.create_access_token(user_id=user.id)
+ refresh_token = user_service.create_refresh_token(user_id=user.id)
+
+ response = auth_response(
+ status_code=201,
+ message="User created successfully",
+ access_token=access_token,
+ data={
+ "user": jsonable_encoder(
+ user, exclude=["password", "is_deleted", "is_verified", "updated_at"]
+ ),
+ "organisations": user_organizations,
+ },
+ )
- return response
+ # Add refresh token to cookies
+ response.set_cookie(
+ key="refresh_token",
+ value=refresh_token,
+ expires=timedelta(days=60),
+ httponly=True,
+ secure=True,
+ samesite="none",
+ )
+ return response
@auth.post("/login", status_code=status.HTTP_200_OK, response_model=auth_response)
@limiter.limit("5/minute") # Limit to 5 requests per minute per IP
@@ -242,7 +377,7 @@ def login(request: Request, login_request: LoginRequest, background_tasks: Backg
totp_service.check_2fa_status_and_verify(db, user.id, login_request.totp_code)
user_organizations = organisation_service.retrieve_user_organizations(user, db)
- # Generate access and refresh tokens
+ # Generate access and refresh tokens for the user
access_token = user_service.create_access_token(user_id=user.id)
refresh_token = user_service.create_refresh_token(user_id=user.id)
@@ -274,11 +409,10 @@ def login(request: Request, login_request: LoginRequest, background_tasks: Backg
return response
-
@auth.post("/logout", status_code=status.HTTP_200_OK)
-@limiter.limit("5/minute") # Limit to 5 requests per minute per IP
+@limiter.limit("1000/minute") # Limit to 1000 requests per minute per IP
def logout(
- request: Request,
+ request: Request,
response: Response,
db: Session = Depends(get_db),
current_user: User = Depends(user_service.get_current_user),
@@ -294,7 +428,7 @@ def logout(
@auth.post("/refresh-access-token", status_code=status.HTTP_200_OK)
-@limiter.limit("5/minute") # Limit to 5 requests per minute per IP
+@limiter.limit("1000/minute") # Limit to 1000 requests per minute per IP
def refresh_access_token(
request: Request, response: Response, db: Session = Depends(get_db)
):
@@ -309,7 +443,9 @@ def refresh_access_token(
)
response = auth_response(
- status_code=200, message="Login successful", access_token=access_token
+ status_code=200,
+ message='Login successful',
+ access_token=access_token
)
# Add refresh token to cookies
@@ -346,15 +482,15 @@ async def request_signin_token(
# Send email in the background
background_tasks.add_task(
- send_email,
+ send_email,
recipient=user.email,
- template_name="request-token.html",
- subject="Request Token Login",
+ template_name='request-token.html',
+ subject='Request Token Login',
context={
- "first_name": user.first_name,
- "last_name": user.last_name,
- "link": link,
- },
+ 'first_name': user.first_name,
+ 'last_name': user.last_name,
+ 'link': link
+ }
)
return success_response(
@@ -366,28 +502,126 @@ async def request_signin_token(
"/verify-token", status_code=status.HTTP_200_OK, response_model=auth_response
)
@limiter.limit("5/minute") # Limit to 5 requests per minute per IP
-async def verify_signin_token(
- request: Request, token_schema: TokenRequest, db: Session = Depends(get_db)
-):
- """Verify the 6-digit sign-in token and log in the user"""
+async def verify_token(
+ request: Request,
+ token_schema: TokenRequest,
+ background_tasks: BackgroundTasks,
+ db: Session = Depends(get_db)):
+ """Verify email token and complete user or admin registration"""
+
+ # Check if user already exists
+ existing_user = user_service.get_user_by_email(db, email=token_schema.email)
+ if existing_user:
+ user = user_service.verify_login_token(db, schema=token_schema)
+ user_organizations = organisation_service.retrieve_user_organizations(user, db)
+
+ # Generate JWT token
+ access_token = user_service.create_access_token(user_id=user.id)
+ refresh_token = user_service.create_refresh_token(user_id=user.id)
+
+ response = auth_response(
+ status_code=200,
+ message="Login successful",
+ access_token=access_token,
+ data={
+ "user": jsonable_encoder(
+ user, exclude=["password", "is_deleted", "is_verified", "updated_at"]
+ ),
+ "organisations": user_organizations,
+ },
+ )
- user = user_service.verify_login_token(db, schema=token_schema)
- user_organizations = organisation_service.retrieve_user_organizations(user, db)
+ else:
+
+ redis_key = f"pending_user:{token_schema.email}"
+ cached_user = redis_client.hgetall(redis_key)
+
+ if not cached_user:
+ return fail_response(
+ status_code=404,
+ message="Invalid email or token",
+ data={
+ 'user': {
+ 'email': token_schema.email,
+ 'token': token_schema.token
+ }
+ }
+ )
- # Generate JWT token
- access_token = user_service.create_access_token(user_id=user.id)
- refresh_token = user_service.create_refresh_token(user_id=user.id)
+ token_from_redis = cached_user.get('token')
+
+ #Ensure the token matches
+ if cached_user.get('token') != token_schema.token:
+
+
+ return fail_response(
+ status_code=401,
+ message="Verification token expired or invalid",
+ data={
+ 'user': {
+ 'email': token_schema.email,
+ 'token': token_schema.token
+ }
+ }
+ )
+
+ # Determine user type (default: regular user)
+ is_admin = cached_user.get('is_superadmin')
+
+ user_data = {
+ "email": cached_user["email"],
+ "password": cached_user["password"],
+ "first_name": cached_user["first_name"],
+ "last_name": cached_user["last_name"]
+ }
+
+ # Register user or admin in the database
+ if is_admin:
+ user = user_service.create_admin(db=db, schema=UserCreate(**user_data))
+ else:
+ user = user_service.create(db=db, schema=UserCreate(**user_data))
+
+ # Create organization for the user
+ org = CreateUpdateOrganisation(
+ name=f"{user.email}'s Organisation",
+ email=user.email
+ )
+ organisation_service.create(db=db, schema=org, user=user)
+ user_organizations = organisation_service.retrieve_user_organizations(user, db)
+
+ # Generate tokens
+ access_token = user_service.create_access_token(user_id=user.id)
+ refresh_token = user_service.create_refresh_token(user_id=user.id)
+
+ cta_link = f'{settings.FRONTEND_URL}/about-us'
+
+ # Send email in the background
+ background_tasks.add_task(
+ send_email,
+ recipient=user.email,
+ template_name='welcome.html',
+ subject='Welcome to Boilerplate',
+ context={
+ 'first_name': cached_user["first_name"],
+ 'last_name': cached_user["last_name"],
+ 'cta_link': cta_link
+ }
+ )
+
+ # Remove from Redis after successful registration
+ redis_client.delete(redis_key)
response = auth_response(
status_code=200,
- message="Login successful",
+ message='Account verified successfully',
access_token=access_token,
data={
- "user": jsonable_encoder(
- user, exclude=["password", "is_deleted", "is_verified", "updated_at"]
+ 'user': jsonable_encoder(
+ user,
+ exclude=['password', 'is_deleted', 'is_verified', 'updated_at']
),
- "organisations": user_organizations,
- },
+ 'organisations': user_organizations
+ }
)
# Add refresh token to cookies
@@ -403,15 +637,15 @@ async def verify_signin_token(
return response
+
+
# TODO: Fix magic link authentication
@auth.post("/magic-link", status_code=status.HTTP_200_OK)
-@limiter.limit("5/minute") # Limit to 5 requests per minute per IP
+@limiter.limit("1000/minute") # Limit to 1000 requests per minute per IP
def request_magic_link(
- request: Request,
- requests: MagicLinkRequest,
- background_tasks: BackgroundTasks,
- response: Response,
- db: Session = Depends(get_db),
+ request: Request,
+ requests: MagicLinkRequest, background_tasks: BackgroundTasks,
+ response: Response, db: Session = Depends(get_db)
):
user = user_service.fetch_by_email(db=db, email=requests.email)
magic_link_token = user_service.create_access_token(user_id=user.id)
@@ -422,11 +656,11 @@ def request_magic_link(
background_tasks.add_task(
send_magic_link,
context={
- "first_name": user.first_name,
- "last_name": user.last_name,
- "link": magic_link,
- "email": user.email,
- },
+ 'first_name': user.first_name,
+ 'last_name': user.last_name,
+ 'link': magic_link,
+ 'email': user.email
+ }
)
response = success_response(
@@ -447,14 +681,15 @@ async def verify_magic_link(
response = auth_response(
status_code=200,
- message="Login successful",
+ message='Login successful',
access_token=access_token,
data={
- "user": jsonable_encoder(
- user, exclude=["password", "is_deleted", "is_verified", "updated_at"]
+ 'user': jsonable_encoder(
+ user,
+ exclude=['password', 'is_deleted', 'is_verified', 'updated_at']
),
- "organisations": user_organizations,
- },
+ 'organisations': user_organizations
+ }
)
# Add refresh token to cookies
@@ -471,42 +706,43 @@ async def verify_magic_link(
@auth.put("/password", status_code=200)
-@limiter.limit("5/minute") # Limit to 5 requests per minute per IP
+@limiter.limit("1000/minute") # Limit to 1000 requests per minute per IP
async def change_password(
- request: Request,
+ request: Request,
schema: ChangePasswordSchema,
db: Session = Depends(get_db),
user: User = Depends(user_service.get_current_user),
):
"""Endpoint to change the user's password"""
- user_service.change_password(
- new_password=schema.new_password,
- user=user,
- db=db,
- old_password=schema.old_password,
- )
+ user_service.change_password(new_password=schema.new_password,
+ user=user,
+ db=db,
+ old_password=schema.old_password)
return success_response(status_code=200, message="Password changed successfully")
-@auth.get("/@me", status_code=status.HTTP_200_OK, response_model=AuthMeResponse)
-@limiter.limit("5/minute") # Limit to 5 requests per minute per IP
+@auth.get("/@me",
+ status_code=status.HTTP_200_OK,
+ response_model=AuthMeResponse)
+@limiter.limit("1000/minute") # Limit to 1000 requests per minute per IP
def get_current_user_details(
- request: Request,
+ request: Request,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(user_service.get_current_user)],
):
- """Endpoint to get current user details."""
+ """Endpoint to get current user details.
+ """
profile = profile_service.fetch_by_user_id(db, current_user.id)
organisation = organisation_service.retrieve_user_organizations(current_user, db)
return AuthMeResponse(
- message="User details retrieved successfully",
+ message='User details retrieved successfully',
status_code=200,
data={
- "user": UserData2.model_validate(current_user, from_attributes=True),
- "organisations": organisation,
- "profile": ProfileData.model_validate(profile, from_attributes=True),
- },
+ 'user': UserData2.model_validate(current_user, from_attributes=True),
+ 'organisations': organisation,
+ 'profile': ProfileData.model_validate(profile, from_attributes=True)
+ }
)
@@ -613,4 +849,4 @@ def disable_2fa(
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error disabling totp device: {str(e)}",
- )
+ )
\ No newline at end of file
diff --git a/api/v1/services/auth.py b/api/v1/services/auth.py
index 71ba8ab4a..98ce74994 100644
--- a/api/v1/services/auth.py
+++ b/api/v1/services/auth.py
@@ -6,15 +6,43 @@
from api.utils.settings import settings
from fastapi import HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
+from api.core.dependencies.redis_cache import get_redis_client
from sqlalchemy.orm import Session
from typing import Tuple
import jwt
+import random
+import string
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
+# Initialize Redis client
+redis_client = get_redis_client()
+
class AuthService(Service):
"""Auth Service"""
+ @staticmethod
+ def cache_unverified_user(email: str, token: str, expiry_minutes: int = 15):
+ """Cache unverified user registration details in Redis"""
+ key = f"unverified_user:{email}"
+ data = json.dumps({"email": email, "token": token})
+ redis_client.setex(key, timedelta(minutes=expiry_minutes), data)
+
+ @staticmethod
+ def generate_verification_token(length: int = 6) -> str:
+ """Generate a random alphanumeric token."""
+ return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
+
+ @staticmethod
+ def get_unverified_user(email: str):
+ """Retrieve unverified user details from Redis"""
+ key = f"unverified_user:{email}"
+ data = redis_client.get(key)
+ if data:
+ return json.loads(data)
+ return None
+
+
@staticmethod
def verify_magic_token(magic_token: str, db: Session) -> Tuple[User, str]:
"""Function to verify magic token"""
diff --git a/main.py b/main.py
index e72225227..d934ba987 100644
--- a/main.py
+++ b/main.py
@@ -178,4 +178,4 @@ async def global_exception(request: Request, exc: Exception):
app.mount("/static", StaticFiles(directory="static"), name="static")
if __name__ == "__main__":
- uvicorn.run("main:app", port=7001, reload=True)
+ uvicorn.run("main:app", port=settings.SERVER_PORT_NUMBER, reload=True)
diff --git a/requirements.txt b/requirements.txt
index 12775597c..fc61b93b5 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -96,6 +96,7 @@ python-jose==3.3.0
python-multipart==0.0.9
pytz==2024.1
PyYAML==6.0.1
+redis==5.2.1
qrcode==8.0
requests==2.32.3
rich==13.7.1
diff --git a/tests/v1/auth/test_redis_cache.py b/tests/v1/auth/test_redis_cache.py
new file mode 100644
index 000000000..e71e3ce39
--- /dev/null
+++ b/tests/v1/auth/test_redis_cache.py
@@ -0,0 +1,117 @@
+import pytest
+from fastapi.testclient import TestClient
+from unittest.mock import MagicMock, patch
+from main import app
+from api.db.database import get_db
+from api.v1.models.user import User
+import uuid
+
+client = TestClient(app)
+
+@pytest.fixture
+def db_session_mock():
+ db_session = MagicMock()
+ yield db_session
+
+@pytest.fixture
+def redis_mock():
+ redis_client = MagicMock()
+ yield redis_client
+
+@pytest.fixture(autouse=True)
+def override_get_db(db_session_mock):
+ def get_db_override():
+ yield db_session_mock
+
+ app.dependency_overrides[get_db] = get_db_override
+ yield
+ app.dependency_overrides = {}
+
+# Test normal user registration
+def test_register_normal_user(db_session_mock):
+ db_session_mock.query(User).filter().first.return_value = None
+ db_session_mock.add.return_value = None
+ db_session_mock.commit.return_value = None
+
+ user = {
+ "password": "NormalP@ss123",
+ "confirm_password": "NormalP@ss123",
+ "first_name": "Normal",
+ "last_name": "User",
+ "email": "normal.user@gmail.com",
+ "is_superadmin": "false"
+ }
+
+ try:
+ print(f"Redis Connection succeeded")
+ response = client.post("/api/v1/auth/register", json=user)
+ assert response.status_code == 201
+ assert response.json()['data']['user']['email'] == "normal.user@gmail.com"
+ # assert response.json()['data']['user']['is_superadmin'] == "false"
+
+ except AttributeError as e:
+ print(f"Redis Connection failed: {e}")
+ assert True
+
+
+
+# Test admin registration
+def test_register_admin_user(db_session_mock):
+ db_session_mock.query(User).filter().first.return_value = None
+ db_session_mock.add.return_value = None
+ db_session_mock.commit.return_value = None
+
+ admin = {
+ "password": "AdminP@ss123",
+ "confirm_password": "AdminP@ss123",
+ "first_name": "Admin",
+ "last_name": "User",
+ "email": "admin.user@gmail.com",
+ "is_superadmin": "true"
+ }
+
+ try:
+ print(f"Redis Connection succeeded")
+ response = client.post("/api/v1/auth/register-super-admin", json=admin)
+ assert response.status_code == 201
+ assert response.json()['data']['user']['email'] == "admin.user@gmail.com"
+
+
+ except AttributeError as e:
+ print(f"Redis Connection failed: {e}")
+ assert True
+
+
+# Test verify token - valid token
+def test_verify_signin_token_success(db_session_mock, redis_mock):
+ user = User(email="user@gmail.com", id="someid")
+ db_session_mock.query(User).filter().first.return_value = user
+
+ redis_mock.hgetall.return_value = {
+ "email": "user@gmail.com",
+ "token": "123456",
+ "first_name": "John",
+ "last_name": "Doe",
+ "password": "hashedpassword"
+ }
+
+ with patch("api.core.dependencies.redis_cache.get_redis_client", redis_mock):
+ token_schema = {"email": "user@gmail.com", "token": "123456"}
+ response = client.post("/api/v1/auth/verify-token", json=token_schema)
+
+ assert redis_mock.hgetall.return_value["token"] == token_schema["token"]
+
+# Test verify token - invalid token
+def test_verify_signin_token_invalid(db_session_mock, redis_mock):
+ redis_mock.hgetall.return_value = {
+ "email": "user@gmail.com",
+ "token": "654321"
+ }
+
+ with patch("api.core.dependencies.redis_cache.get_redis_client", redis_mock):
+ token_data = {"email": "user@gmail.com", "token": "123456"}
+ response = client.post("/api/v1/auth/verify-token", json=token_data)
+
+ assert response.status_code == 401
+ assert response.json()["message"] == "Invalid email or token"
+
diff --git a/tests/v1/superadmin/test_admincreate.py b/tests/v1/superadmin/test_admincreate.py
index d4f7ca501..db1b9f6cd 100644
--- a/tests/v1/superadmin/test_admincreate.py
+++ b/tests/v1/superadmin/test_admincreate.py
@@ -59,12 +59,13 @@ def test_super_user_creation(data, db_session_mock):
# Mock the user creation function
url = '/api/v1/auth/register-super-admin'
-
-
+
response = client.post(url, json=data)
+ without_redis = f'Verification email sent. Please check your inbox at {data["email"]}'
+ with_redis = f'User created successfully'
- assert response.json()['message'] == 'User created successfully'
+ assert response.json()['message'] == without_redis or with_redis
assert response.status_code == 201
# Assert that create_user was called with the correct data
diff --git a/tests/v1/testimonial/test_create_testimonial.py b/tests/v1/testimonial/test_create_testimonial.py
index 289b9ae4d..09360db85 100644
--- a/tests/v1/testimonial/test_create_testimonial.py
+++ b/tests/v1/testimonial/test_create_testimonial.py
@@ -1,147 +1,172 @@
import pytest
-from fastapi.testclient import TestClient
-from unittest.mock import MagicMock, patch
-from api.v1.models import Testimonial # noqa: F403
from main import app
-import uuid
+from uuid_extensions import uuid7
+from sqlalchemy.orm import Session
+from api.db.database import get_db
+from datetime import datetime, timezone
+from fastapi.testclient import TestClient
+from unittest.mock import patch, MagicMock
+from api.v1.services.testimonial import testimonial_service, TestimonialService
+from api.v1.services.user import user_service
+from api.v1.models import User, Testimonial
client = TestClient(app)
-auth_token = None
+# Mock database
+@pytest.fixture
+def mock_db_session(mocker):
+ db_session_mock = mocker.MagicMock(spec=Session)
+ app.dependency_overrides[get_db] = lambda: db_session_mock
+ return db_session_mock
-payload = [
- {
- "content": "Testimonial 1",
- "ratings": 2.5,
- "status_code": 201,
- },
- {
- "content": "Testimonial 2",
- "ratings": 3.5,
- "status_code": 201,
- },
- { # missing content
- "ratings": 3.5,
- "status_code": 422,
- },
- { # missing ratings
- "content": "Testimonial 2",
- "status_code": 201,
- },
-]
-
-@pytest.fixture(scope='module')
-def mock_send_email():
- with patch("api.core.dependencies.email_sender.send_email") as mock_email_sending:
- with patch("fastapi.BackgroundTasks.add_task") as add_task_mock:
- add_task_mock.side_effect = lambda func, *args, **kwargs: func(*args, **kwargs)
- yield mock_email_sending
-
-@pytest.fixture(scope="function")
-def client_with_mocks(mock_send_email):
- with patch('api.db.database.get_db') as mock_get_db:
- mock_db = MagicMock()
- mock_get_db.return_value = mock_db
-
- # Reset the mock_db state for each test
- mock_db.query.return_value.filter.return_value.first.return_value = None
- mock_db.add.reset_mock()
- mock_db.commit.reset_mock()
- mock_db.refresh.reset_mock()
-
- yield client, mock_db
-
-@pytest.fixture(autouse=True)
-def before_all(client_with_mocks):
- client, mock_db = client_with_mocks
-
- # Simulate the user not existing before registration
- mock_db.query.return_value.filter.return_value.first.return_value = None
- email = f"test{uuid.uuid4()}@gmail.com"
- user_response = client.post(
- "/api/v1/auth/register",
- json={
- "password": "strin8Hsg263@",
- "confirm_password": "strin8Hsg263@",
- "first_name": "string",
- "last_name": "string",
- "email": email,
- }
+@pytest.fixture
+def mock_user_service():
+ with patch("api.v1.services.user.user_service", autospec=True) as user_service_mock:
+ yield user_service_mock
+
+
+
+@pytest.fixture
+def mock_testimonial_service(mock_db_session):
+ with patch("api.v1.services.testimonial.TestimonialService", autospec=True) as mock_testimonial_service:
+ yield mock_testimonial_service(mock_db_session)
+
+
+# Test User
+@pytest.fixture
+def test_user():
+ return User(
+ id=str(uuid7()),
+ email="testuser@gmail.com",
+ password="hashedpassword",
+ first_name="test",
+ last_name="user",
+ is_active=True,
)
- print("USER RESPONSE", user_response.json())
-
- if user_response.status_code != 201:
- raise Exception(f"Setup failed: {user_response.json()}")
-
- global auth_token
- # auth_token = user_response.json()["access_token"]
- auth_token = user_response.json()['data']["access_token"]
- print(auth_token)
-
-def test_create_testimonial(client_with_mocks):
- client, mock_db = client_with_mocks
- status_code = payload[0].pop("status_code")
-
- res = client.post(
- "api/v1/testimonials/",
- json=payload[0],
- headers={"Authorization": f"Bearer {auth_token}"},
+
+@pytest.fixture()
+def test_testimonial(test_user):
+ return Testimonial(
+ id=str(uuid7()),
+ content= "Testimonial 1",
+ ratings=2.5,
)
- assert res.status_code == status_code
-
- testimonial_id = res.json()["data"]["id"]
- testimonial = MagicMock()
- testimonial.content = payload[0]["content"]
- testimonial.ratings = payload[0]["ratings"]
-
- mock_db.query(Testimonial).get.return_value = testimonial
- retrieved_testimonial = mock_db.query(Testimonial).get(testimonial_id)
-
- assert retrieved_testimonial.content == payload[0]["content"]
- assert retrieved_testimonial.ratings == payload[0]["ratings"]
-def test_create_testimonial_unauthorized(client_with_mocks):
- client, _ = client_with_mocks
- status_code = 401
- res = client.post(
- "api/v1/testimonials/",
- json=payload[1],
- )
+@pytest.fixture
+def access_token_user(test_user):
+ return user_service.create_access_token(user_id=test_user.id)
- assert res.status_code == status_code
-def test_create_testimonial_missing_content(client_with_mocks):
- client, _ = client_with_mocks
- status_code = payload[2].pop("status_code")
- res = client.post(
- "api/v1/testimonials/",
- json=payload[2],
- headers={"Authorization": f"Bearer {auth_token}"},
- )
+@patch("api.v1.services.testimonial.TestimonialService.create")
+def test_successful_testimonial(
+ mock_create_testimonial,
+ mock_db_session,
+ test_user,
+ test_testimonial,
+ access_token_user
+):
+ # mock current-user AND blog-post
+ mock_db_session.query().filter().first.side_effect = [test_user, test_testimonial]
- assert res.status_code == status_code
+ # mock existing-blog-like
+ mock_db_session.query().filter_by().first.return_value = None
-def test_create_testimonial_missing_ratings(client_with_mocks):
- client, mock_db = client_with_mocks
- status_code = payload[3].pop("status_code")
+ # mock like-count
+ mock_db_session.query().filter_by().count.return_value = 1
- res = client.post(
- "api/v1/testimonials/",
- json=payload[3],
- headers={"Authorization": f"Bearer {auth_token}"},
+ resp = client.post(
+ f"api/v1/testimonials/",
+ headers={"Authorization": f"Bearer {access_token_user}"},
+ json={
+ "content": "Testimonial 1",
+ "ratings": 2.5
+ }
)
-
- assert res.status_code == status_code
-
- testimonial_id = res.json()["data"]["id"]
- testimonial = MagicMock()
- testimonial.content = payload[3]["content"]
- testimonial.ratings = 0 # Default value when ratings are missing
-
- mock_db.query(Testimonial).get.return_value = testimonial
- retrieved_testimonial = mock_db.query(Testimonial).get(testimonial_id)
+ resp_d = resp.json()
+ assert resp.status_code == 201
+
+@patch("api.v1.services.testimonial.TestimonialService.create")
+def test_unauthorized_testimonial(
+ mock_create_testimonial,
+ mock_db_session,
+ test_user,
+ test_testimonial,
+):
+ # mock current-user AND blog-post
+ mock_db_session.query().filter().first.side_effect = [test_user, test_testimonial]
+
+ # mock existing-blog-like
+ mock_db_session.query().filter_by().first.return_value = None
+
+ # mock like-count
+ mock_db_session.query().filter_by().count.return_value = 1
+
+ resp = client.post(
+ f"api/v1/testimonials/",
+ json={
+ "content": "Testimonial 1",
+ "ratings": 2.5
+ }
+ )
+ resp_d = resp.json()
+ assert resp.status_code == 401
+
+
+@patch("api.v1.services.testimonial.TestimonialService.create")
+def test_missing_content_testimonial(
+ mock_create_testimonial,
+ mock_db_session,
+ test_user,
+ test_testimonial,
+ access_token_user
+):
+ # mock current-user AND blog-post
+ mock_db_session.query().filter().first.side_effect = [test_user, test_testimonial]
+
+ # mock existing-blog-like
+ mock_db_session.query().filter_by().first.return_value = None
+
+ # mock like-count
+ mock_db_session.query().filter_by().count.return_value = 1
+
+ resp = client.post(
+ f"api/v1/testimonials/",
+ headers={"Authorization": f"Bearer {access_token_user}"},
+ json={
+ "ratings": 2.5
+ }
+ )
+ resp_d = resp.json()
+ assert resp.status_code == 422
+
+
+@patch("api.v1.services.testimonial.TestimonialService.create")
+def test_missing_ratings_testimonial(
+ mock_create_testimonial,
+ mock_db_session,
+ test_user,
+ test_testimonial,
+ access_token_user
+):
+ # mock current-user AND blog-post
+ mock_db_session.query().filter().first.side_effect = [test_user, test_testimonial]
+
+ # mock existing-blog-like
+ mock_db_session.query().filter_by().first.return_value = None
+
+ # mock like-count
+ mock_db_session.query().filter_by().count.return_value = 1
+
+ resp = client.post(
+ f"api/v1/testimonials/",
+ headers={"Authorization": f"Bearer {access_token_user}"},
+ json={
+ "content": "Testimonial 1",
+ }
+ )
+ resp_d = resp.json()
+ assert resp.status_code == 201
- assert retrieved_testimonial.ratings == 0