Skip to content
This repository was archived by the owner on Jun 19, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,17 @@ finmind/
- Logs are emitted as JSON with `request_id` and shipped to Loki via Promtail.
- Pre-provisioned Grafana dashboard: `FinMind Operations and KPI`.

## Contribution Policy
## 📚 Documentation
Please see our `/docs` for full OpenAPI schemas.

## 🪝 Webhooks
FinMind emits signed webhooks (HMAC SHA-256) for key events.
You can register an endpoint via `POST /webhooks` and listen to the following events:
- `expense.created`: Emitted when a new expense is logged.

Each webhook contains an `X-FinMind-Signature` header for verification. Webhooks will automatically retry up to 5 times using exponential backoff in case of delivery failure.

## 🤝 Contribution Policy
- See `CONTRIBUTING.md` for fork-first contribution flow and PR requirements.

## Notes on Free-Tier Reminders
Expand Down
11 changes: 11 additions & 0 deletions packages/backend/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import logging
from datetime import timedelta
from apscheduler.schedulers.background import BackgroundScheduler


def create_app(settings: Settings | None = None) -> Flask:
Expand Down Expand Up @@ -56,6 +57,16 @@ def create_app(settings: Settings | None = None) -> Flask:
with app.app_context():
_ensure_schema_compatibility(app)

# Initialize APScheduler for webhooks
scheduler = BackgroundScheduler()
def job_process_webhooks():
with app.app_context():
from .services.webhook_service import process_pending_webhooks
process_pending_webhooks()

scheduler.add_job(func=job_process_webhooks, trigger="interval", seconds=60)
scheduler.start()

@app.before_request
def _before_request():
init_request_context()
Expand Down
24 changes: 24 additions & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,27 @@ class AuditLog(db.Model):
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
action = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class WebhookEndpoint(db.Model):
__tablename__ = "webhook_endpoints"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
url = db.Column(db.String(500), nullable=False)
secret = db.Column(db.String(255), nullable=False)
event_types = db.Column(db.String(255), nullable=False, default="*") # comma separated or *
active = db.Column(db.Boolean, default=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class WebhookDelivery(db.Model):
__tablename__ = "webhook_deliveries"
id = db.Column(db.Integer, primary_key=True)
endpoint_id = db.Column(db.Integer, db.ForeignKey("webhook_endpoints.id"), nullable=False)
event_type = db.Column(db.String(100), nullable=False)
payload = db.Column(db.Text, nullable=False)
status = db.Column(db.String(20), default="PENDING", nullable=False) # PENDING, SUCCESS, FAILED
attempts = db.Column(db.Integer, default=0, nullable=False)
last_attempt_at = db.Column(db.DateTime, nullable=True)
next_retry_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .webhooks import bp as webhooks_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(webhooks_bp, url_prefix="/webhooks")
5 changes: 4 additions & 1 deletion packages/backend/app/routes/expenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ..models import Expense, RecurringCadence, RecurringExpense, User
from ..services.cache import cache_delete_patterns, monthly_summary_key
from ..services import expense_import
from ..services.webhook_service import emit_event
import logging

bp = Blueprint("expenses", __name__)
Expand Down Expand Up @@ -84,7 +85,9 @@ def create_expense():
f"insights:{uid}:*",
]
)
return jsonify(_expense_to_dict(e)), 201
expense_data = _expense_to_dict(e)
emit_event(uid, "expense.created", expense_data)
return jsonify(expense_data), 201


@bp.get("/recurring")
Expand Down
61 changes: 61 additions & 0 deletions packages/backend/app/routes/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity
from ..extensions import db
from ..models import WebhookEndpoint
import secrets

bp = Blueprint("webhooks", __name__, url_prefix="/webhooks")

@bp.post("/")
@jwt_required()
def create_webhook():
user_id = get_jwt_identity()
data = request.json or {}
url = data.get("url")
event_types = data.get("event_types", "*")

if not url:
return jsonify(error="URL is required"), 400

endpoint = WebhookEndpoint(
user_id=user_id,
url=url,
secret=secrets.token_hex(32),
event_types=event_types
)
db.session.add(endpoint)
db.session.commit()

return jsonify({
"id": endpoint.id,
"url": endpoint.url,
"secret": endpoint.secret,
"event_types": endpoint.event_types,
"active": endpoint.active
}), 201

@bp.get("/")
@jwt_required()
def list_webhooks():
user_id = get_jwt_identity()
endpoints = WebhookEndpoint.query.filter_by(user_id=user_id).all()
return jsonify([
{
"id": ep.id,
"url": ep.url,
"event_types": ep.event_types,
"active": ep.active
} for ep in endpoints
]), 200

@bp.delete("/<int:endpoint_id>")
@jwt_required()
def delete_webhook(endpoint_id):
user_id = get_jwt_identity()
endpoint = WebhookEndpoint.query.filter_by(id=endpoint_id, user_id=user_id).first()
if not endpoint:
return jsonify(error="Endpoint not found"), 404

db.session.delete(endpoint)
db.session.commit()
return jsonify(success=True), 200
83 changes: 83 additions & 0 deletions packages/backend/app/services/webhook_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import json
import hmac
import hashlib
import requests
from datetime import datetime, timedelta
from typing import Dict, Any
from ..extensions import db
from ..models import WebhookEndpoint, WebhookDelivery

def generate_signature(payload: str, secret: str) -> str:
"""Generate HMAC SHA-256 signature for webhook payload."""
return hmac.new(
secret.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).hexdigest()

def emit_event(user_id: int, event_type: str, payload_data: Dict[str, Any]):
"""Queue a webhook event for all active endpoints of a user."""
endpoints = WebhookEndpoint.query.filter_by(user_id=user_id, active=True).all()
payload_str = json.dumps(payload_data)

for ep in endpoints:
if ep.event_types == "*" or event_type in [t.strip() for t in ep.event_types.split(",")]:
delivery = WebhookDelivery(
endpoint_id=ep.id,
event_type=event_type,
payload=payload_str,
status="PENDING",
attempts=0,
next_retry_at=datetime.utcnow()
)
db.session.add(delivery)

db.session.commit()

def process_pending_webhooks():
"""Process pending webhooks. Meant to be called by a scheduler or background job."""
from flask import current_app

# We need an application context to query the db
now = datetime.utcnow()
deliveries = WebhookDelivery.query.filter(
WebhookDelivery.status == "PENDING",
WebhookDelivery.next_retry_at <= now
).all()

for d in deliveries:
endpoint = WebhookEndpoint.query.get(d.endpoint_id)
if not endpoint or not endpoint.active:
d.status = "FAILED"
continue

d.attempts += 1
d.last_attempt_at = now

signature = generate_signature(d.payload, endpoint.secret)
headers = {
"Content-Type": "application/json",
"X-FinMind-Event": d.event_type,
"X-FinMind-Signature": f"sha256={signature}"
}

try:
resp = requests.post(endpoint.url, data=d.payload, headers=headers, timeout=5)
if 200 <= resp.status_code < 300:
d.status = "SUCCESS"
else:
_handle_webhook_failure(d)
except requests.RequestException:
_handle_webhook_failure(d)

db.session.commit()

def _handle_webhook_failure(delivery: WebhookDelivery):
"""Handle retry logic using exponential backoff."""
MAX_ATTEMPTS = 5
if delivery.attempts >= MAX_ATTEMPTS:
delivery.status = "FAILED"
else:
# Exponential backoff: 1m, 5m, 25m, 125m
delay_minutes = 5 ** (delivery.attempts - 1)
delivery.next_retry_at = datetime.utcnow() + timedelta(minutes=delay_minutes)