Skip to content
This repository was archived by the owner on Jun 19, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/backend/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,22 @@ def _ensure_schema_compatibility(app: Flask) -> None:
conn.rollback()
finally:
conn.close()

# GDPR: add ip_address column to audit_logs if missing
conn = db.engine.raw_connection()
try:
cur = conn.cursor()
cur.execute(
"""
ALTER TABLE audit_logs
ADD COLUMN IF NOT EXISTS ip_address VARCHAR(45)
"""
)
conn.commit()
except Exception:
app.logger.exception(
"Schema compatibility patch failed for audit_logs.ip_address"
)
conn.rollback()
finally:
conn.close()
1 change: 1 addition & 0 deletions packages/backend/app/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,6 @@ CREATE TABLE IF NOT EXISTS audit_logs (
id SERIAL PRIMARY KEY,
user_id INT REFERENCES users(id) ON DELETE SET NULL,
action VARCHAR(100) NOT NULL,
ip_address VARCHAR(45),
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
1 change: 1 addition & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,5 @@ class AuditLog(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
action = db.Column(db.String(100), nullable=False)
ip_address = db.Column(db.String(45), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .gdpr import bp as gdpr_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(gdpr_bp, url_prefix="/gdpr")
82 changes: 82 additions & 0 deletions packages/backend/app/routes/gdpr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""GDPR-compliant PII export and deletion routes.

Endpoints:
GET /gdpr/users/<id>/export – download all personal data as JSON
DELETE /gdpr/users/<id> – irreversibly delete user and all data
"""
from flask import Blueprint, jsonify, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from ..extensions import db
from ..models import User, AuditLog
from ..services.gdpr import collect_user_data, permanently_delete_user, log_audit_action

bp = Blueprint("gdpr", __name__)


@bp.get("/users/<int:user_id>/export")
@jwt_required()
def export_user_data(user_id: int):
"""Export all PII for the authenticated user as a JSON package."""
current_uid = int(get_jwt_identity())
if current_uid != user_id:
return jsonify(error="forbidden – can only export your own data"), 403

user = db.session.get(User, user_id)
if not user:
return jsonify(error="user not found"), 404

ip_address = request.remote_addr
data = collect_user_data(user_id)

# Audit trail
log_audit_action(
user_id=user_id,
action="GDPR_DATA_EXPORT",
ip_address=ip_address,
)

return jsonify(data), 200


@bp.delete("/users/<int:user_id>")
@jwt_required()
def delete_user(user_id: int):
"""Irreversibly delete the authenticated user and all associated data.

Requires a JSON body with ``{"confirm": true}`` to prevent accidental
deletion.
"""
current_uid = int(get_jwt_identity())
if current_uid != user_id:
return jsonify(error="forbidden – can only delete your own data"), 403

user = db.session.get(User, user_id)
if not user:
return jsonify(error="user not found"), 404

# Require explicit confirmation
data = request.get_json(silent=True) or {}
if not data.get("confirm"):
return jsonify(error="confirmation required – send {\"confirm\": true}"), 400

ip_address = request.remote_addr

# Audit trail – log *before* deletion so user_id FK is still valid
log_audit_action(
user_id=user_id,
action="GDPR_DATA_DELETE",
ip_address=ip_address,
)

# Set user_id on audit logs to NULL before deleting the user,
# so the audit record survives (GDPR requires we keep evidence of deletion).
db.session.query(AuditLog).filter_by(user_id=user_id).update(
{"user_id": None}, synchronize_session="fetch"
)
db.session.commit()

success = permanently_delete_user(user_id)
if not success:
return jsonify(error="deletion failed"), 500

return jsonify(message="user and all associated data permanently deleted"), 200
119 changes: 119 additions & 0 deletions packages/backend/app/services/gdpr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""GDPR-compliant PII export and deletion service."""
import logging
from datetime import datetime, timezone
from ..extensions import db
from ..models import (
User,
Category,
Expense,
RecurringExpense,
Bill,
Reminder,
AdImpression,
UserSubscription,
AuditLog,
)

logger = logging.getLogger("finmind.gdpr")


def collect_user_data(user_id: int) -> dict:
"""Collect all PII associated with a user for export.

Returns a serialisable dict containing every piece of personal data
stored for the given user.
"""
user = db.session.get(User, user_id)
if not user:
return {}

def _model_to_dict(instance):
"""Convert a SQLAlchemy model instance to a plain dict."""
return {
c.key: getattr(instance, c.key)
for c in db.inspect(instance).mapper.column_attrs
}

def _serialize_values(d):
"""Ensure all values are JSON-serialisable."""
out = {}
for k, v in d.items():
if isinstance(v, datetime):
out[k] = v.isoformat()
elif hasattr(v, "isoformat"): # date objects
out[k] = v.isoformat()
elif isinstance(v, (int, float, str, bool)) or v is None:
out[k] = v
else:
out[k] = str(v)
return out

categories = db.session.query(Category).filter_by(user_id=user_id).all()
expenses = db.session.query(Expense).filter_by(user_id=user_id).all()
recurring = db.session.query(RecurringExpense).filter_by(user_id=user_id).all()
bills = db.session.query(Bill).filter_by(user_id=user_id).all()
reminders = db.session.query(Reminder).filter_by(user_id=user_id).all()
ad_impressions = db.session.query(AdImpression).filter_by(user_id=user_id).all()
subscriptions = db.session.query(UserSubscription).filter_by(user_id=user_id).all()

return {
"export_timestamp": datetime.now(timezone.utc).isoformat(),
"user": _serialize_values(_model_to_dict(user)),
"categories": [_serialize_values(_model_to_dict(c)) for c in categories],
"expenses": [_serialize_values(_model_to_dict(e)) for e in expenses],
"recurring_expenses": [_serialize_values(_model_to_dict(r)) for r in recurring],
"bills": [_serialize_values(_model_to_dict(b)) for b in bills],
"reminders": [_serialize_values(_model_to_dict(r)) for r in reminders],
"ad_impressions": [_serialize_values(_model_to_dict(a)) for a in ad_impressions],
"subscriptions": [_serialize_values(_model_to_dict(s)) for s in subscriptions],
}


def permanently_delete_user(user_id: int) -> bool:
"""Irreversibly delete all data associated with the given user.

This operation cannot be undone. All related records across every table
are deleted, and finally the user row itself is removed.

Returns True if the user existed and was deleted, False otherwise.
"""
user = db.session.get(User, user_id)
if not user:
return False

# Delete related records in dependency order
# (Some tables have FK ON DELETE CASCADE, but we are explicit for safety,
# especially since SQLite tests may not enforce FK constraints.)
db.session.query(Reminder).filter_by(user_id=user_id).delete()
db.session.query(UserSubscription).filter_by(user_id=user_id).delete()
db.session.query(Bill).filter_by(user_id=user_id).delete()
db.session.query(Expense).filter_by(user_id=user_id).delete()
db.session.query(RecurringExpense).filter_by(user_id=user_id).delete()
db.session.query(Category).filter_by(user_id=user_id).delete()
# Ad impressions: user_id is nullable, set to NULL rather than delete
db.session.query(AdImpression).filter_by(user_id=user_id).update(
{"user_id": None}, synchronize_session="fetch"
)

# Finally delete the user row
db.session.delete(user)
db.session.commit()
logger.info("Permanently deleted user_id=%s and all associated data", user_id)
return True


def log_audit_action(user_id: int | None, action: str, ip_address: str | None = None) -> None:
"""Record a GDPR audit log entry for export/delete operations."""
entry = AuditLog(
user_id=user_id,
action=action,
ip_address=ip_address,
)
db.session.add(entry)
db.session.commit()
logger.info(
"GDPR audit log: user_id=%s action=%s ip=%s",
user_id,
action,
ip_address,
)
37 changes: 37 additions & 0 deletions packages/backend/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import pytest
from unittest.mock import MagicMock
from app import create_app
from app.config import Settings
from app.extensions import db
Expand All @@ -19,6 +20,42 @@ def _setup_db(app):
db.create_all()


@pytest.fixture(autouse=True)
def _mock_redis(monkeypatch):
"""Mock the global redis_client so tests don't need a live Redis server.

Patch at every import site because modules capture the reference at
import time (``from ..extensions import redis_client``).
"""
mock = MagicMock()
mock.setex.return_value = True
mock.get.return_value = None
mock.delete.return_value = True
mock.exists.return_value = False
mock.keys.return_value = []
mock.set.return_value = True
mock.expire.return_value = True
mock.ttl.return_value = -1
mock.ping.return_value = True

# Patch the canonical location
monkeypatch.setattr("app.extensions.redis_client", mock)
# Patch every module that imported the reference
try:
monkeypatch.setattr("app.routes.auth.redis_client", mock)
except AttributeError:
pass
try:
monkeypatch.setattr("app.routes.bills.redis_client", mock)
except AttributeError:
pass
try:
monkeypatch.setattr("app.routes.reminders.redis_client", mock)
except AttributeError:
pass
yield mock


@pytest.fixture()
def app_fixture():
# Ensure a clean env for tests
Expand Down
Loading