From a9ac3527dc81ced58be197a7c92b5a1f865c6b40 Mon Sep 17 00:00:00 2001 From: Entr0zy <267706715+Entr0zy@users.noreply.github.com> Date: Sun, 24 May 2026 09:32:53 +0100 Subject: [PATCH 1/2] feat: resilient background job retry & monitoring (#130) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds production-grade retry logic and execution monitoring for all background jobs, resolving issue #130. New service (app/services/job_retry.py): - JobRecord — per-job dataclass tracking run_count, success_count, failure_count, last_run, last_success, last_failure, last_error, status (idle/running/success/failed) - JobMonitor — thread-safe in-process singleton registry - retryable() — decorator factory: wraps job fn with exponential-backoff retry (configurable max_retries, backoff_base) and auto records every execution in the JobMonitor - _dispatch_due_reminders_job(app) — sends all due unsent Reminder rows - init_job_scheduler(app) — registers reminder_dispatch on APScheduler (every 1 min) wrapped with retryable(); stores scheduler at app.extensions['job_scheduler'] New routes (app/routes/jobs.py): - GET /jobs/status — JWT required; returns all job records - POST /jobs/trigger/ — JWT required; manually fires a job app/__init__.py: - Calls init_job_scheduler(app) on startup (skipped when TESTING=true or DISABLE_SCHEDULER is set) Tests (tests/test_job_retry.py — 20 tests): - JobMonitor: register/idempotent/all_records/to_dict - retryable: success, retry-on-failure, recover-on-2nd-attempt, permanent-failure, error-cleared-on-success, timestamps, run_count - HTTP: GET /jobs/status auth gate, JSON shape, registered job visible, record key structure - HTTP: POST /jobs/trigger auth gate, 503 when no scheduler - Dispatch job: runs without error in fresh DB context Co-Authored-By: Claude Sonnet 4.6 --- packages/backend/app/__init__.py | 5 + packages/backend/app/routes/__init__.py | 2 + packages/backend/app/routes/jobs.py | 45 ++++ packages/backend/app/services/job_retry.py | 234 +++++++++++++++++++ packages/backend/tests/test_job_retry.py | 257 +++++++++++++++++++++ 5 files changed, 543 insertions(+) create mode 100644 packages/backend/app/routes/jobs.py create mode 100644 packages/backend/app/services/job_retry.py create mode 100644 packages/backend/tests/test_job_retry.py diff --git a/packages/backend/app/__init__.py b/packages/backend/app/__init__.py index cdf76b45f..7e2afeb31 100644 --- a/packages/backend/app/__init__.py +++ b/packages/backend/app/__init__.py @@ -52,6 +52,11 @@ def create_app(settings: Settings | None = None) -> Flask: # Blueprint routes register_routes(app) + # Background job scheduler (retry + monitoring) — skipped in test/CI mode + if not os.environ.get("TESTING") and not os.environ.get("DISABLE_SCHEDULER"): + from .services.job_retry import init_job_scheduler + init_job_scheduler(app) + # Backward-compatible schema patch for existing databases. with app.app_context(): _ensure_schema_compatibility(app) diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f897..8d585f259 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .jobs import bp as jobs_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(jobs_bp, url_prefix="/jobs") diff --git a/packages/backend/app/routes/jobs.py b/packages/backend/app/routes/jobs.py new file mode 100644 index 000000000..2e764dfa3 --- /dev/null +++ b/packages/backend/app/routes/jobs.py @@ -0,0 +1,45 @@ +"""Job monitoring route. + +Endpoints +--------- +GET /jobs/status — returns current execution state for all registered jobs + (JWT required) +POST /jobs/trigger/ — manually trigger a scheduled job immediately + (JWT required) +""" +from flask import Blueprint, jsonify, current_app +from flask_jwt_extended import jwt_required +import logging + +from ..services.job_retry import job_monitor + +bp = Blueprint("jobs", __name__) +logger = logging.getLogger("finmind.jobs.routes") + + +@bp.get("/status") +@jwt_required() +def job_status(): + """Return execution health of all registered background jobs.""" + records = job_monitor.all_records() + return jsonify({ + "job_count": len(records), + "jobs": [r.to_dict() for r in records], + }) + + +@bp.post("/trigger/") +@jwt_required() +def trigger_job(job_id: str): + """Manually fire a scheduled job by its APScheduler id.""" + scheduler = current_app.extensions.get("job_scheduler") + if not scheduler: + return jsonify(error="scheduler not running"), 503 + + job = scheduler.get_job(job_id) + if not job: + return jsonify(error=f"job '{job_id}' not found"), 404 + + job.modify(next_run_time=__import__("datetime").datetime.utcnow()) + logger.info("Manually triggered job id=%s", job_id) + return jsonify(message=f"job '{job_id}' triggered"), 200 diff --git a/packages/backend/app/services/job_retry.py b/packages/backend/app/services/job_retry.py new file mode 100644 index 000000000..34ac8ef64 --- /dev/null +++ b/packages/backend/app/services/job_retry.py @@ -0,0 +1,234 @@ +"""Resilient background job retry & monitoring (issue #130). + +Provides: +- JobRecord — per-job execution state snapshot +- JobMonitor — in-process registry of all scheduled job records +- retryable() — decorator factory: wraps a job fn with exponential-backoff retry + and automatic JobMonitor recording +- init_job_scheduler(app) — registers the reminder-dispatch job on APScheduler + (every minute) and exposes the scheduler on app.extensions + +Public API +---------- +job_monitor : JobMonitor — singleton, importable anywhere +retryable(name, ...) — decorator +init_job_scheduler(app) -> BackgroundScheduler +""" +from __future__ import annotations + +import logging +import time +import threading +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Callable + +logger = logging.getLogger("finmind.job_retry") + + +# --------------------------------------------------------------------------- +# JobRecord + JobMonitor +# --------------------------------------------------------------------------- + +@dataclass +class JobRecord: + name: str + run_count: int = 0 + success_count: int = 0 + failure_count: int = 0 + last_run: datetime | None = None + last_success: datetime | None = None + last_failure: datetime | None = None + last_error: str | None = None + status: str = "idle" # idle | running | success | failed + + def to_dict(self) -> dict: + return { + "name": self.name, + "status": self.status, + "run_count": self.run_count, + "success_count": self.success_count, + "failure_count": self.failure_count, + "last_run": self.last_run.isoformat() if self.last_run else None, + "last_success": self.last_success.isoformat() if self.last_success else None, + "last_failure": self.last_failure.isoformat() if self.last_failure else None, + "last_error": self.last_error, + } + + +class JobMonitor: + """Thread-safe, in-process job registry.""" + + def __init__(self) -> None: + self._lock = threading.Lock() + self._jobs: dict[str, JobRecord] = {} + + def register(self, name: str) -> JobRecord: + with self._lock: + if name not in self._jobs: + self._jobs[name] = JobRecord(name=name) + return self._jobs[name] + + def get_record(self, name: str) -> JobRecord | None: + return self._jobs.get(name) + + def all_records(self) -> list[JobRecord]: + return list(self._jobs.values()) + + def to_dict(self) -> dict: + return {name: rec.to_dict() for name, rec in self._jobs.items()} + + def reset(self) -> None: + """For test isolation only.""" + with self._lock: + self._jobs.clear() + + +# Module-level singleton — all code shares one monitor instance. +job_monitor: JobMonitor = JobMonitor() + + +# --------------------------------------------------------------------------- +# retryable decorator +# --------------------------------------------------------------------------- + +def retryable( + name: str, + *, + max_retries: int = 3, + backoff_base: float = 2.0, + app: Any = None, +) -> Callable: + """Return a decorator that wraps *fn* with retry logic and job monitoring. + + Parameters + ---------- + name : human-readable job name recorded in the monitor + max_retries : how many extra attempts after the first failure (default 3) + backoff_base: base for exponential back-off in seconds (default 2.0) + delay = backoff_base ** attempt (0s, 2s, 4s, 8s, …) + app : Flask app to push an app-context on each attempt (optional) + """ + def decorator(fn: Callable) -> Callable: + def wrapper(*args, **kwargs): + record = job_monitor.register(name) + with job_monitor._lock: + record.run_count += 1 + record.last_run = datetime.utcnow() + record.status = "running" + + last_exc: Exception | None = None + for attempt in range(max_retries + 1): + if attempt > 0: + sleep_secs = backoff_base ** (attempt - 1) + logger.warning( + "Job %s retry %d/%d after %.1fs back-off", + name, attempt, max_retries, sleep_secs, + ) + time.sleep(sleep_secs) + try: + if app is not None: + with app.app_context(): + result = fn(*args, **kwargs) + else: + result = fn(*args, **kwargs) + + with job_monitor._lock: + record.last_success = datetime.utcnow() + record.success_count += 1 + record.status = "success" + record.last_error = None + logger.debug("Job %s succeeded (attempt %d)", name, attempt + 1) + return result + + except Exception as exc: + last_exc = exc + logger.error( + "Job %s attempt %d/%d failed: %s", + name, attempt + 1, max_retries + 1, exc, + ) + + # All attempts exhausted + with job_monitor._lock: + record.last_failure = datetime.utcnow() + record.failure_count += 1 + record.status = "failed" + record.last_error = str(last_exc) + logger.error( + "Job %s permanently failed after %d attempt(s): %s", + name, max_retries + 1, last_exc, + ) + + return wrapper + return decorator + + +# --------------------------------------------------------------------------- +# Reminder-dispatch job (runs every minute) +# --------------------------------------------------------------------------- + +def _dispatch_due_reminders_job(app) -> dict: + """Dispatch all due, unsent reminders. Returns stats dict.""" + from datetime import timedelta + from ..extensions import db + from ..models import Reminder + from .reminders import send_reminder + + now = datetime.utcnow() + timedelta(minutes=1) + items = ( + db.session.query(Reminder) + .filter( + Reminder.sent.is_(False), + Reminder.send_at <= now, + ) + .all() + ) + sent = failed = 0 + for r in items: + try: + send_reminder(r) + r.sent = True + sent += 1 + except Exception as exc: + logger.warning("Failed to send reminder id=%s: %s", r.id, exc) + failed += 1 + db.session.commit() + logger.info("Reminder dispatch: sent=%s failed=%s", sent, failed) + return {"sent": sent, "failed": failed} + + +def init_job_scheduler(app): + """Register all background jobs on a BackgroundScheduler and start it. + + The scheduler is stored at ``app.extensions['job_scheduler']`` so callers + can inspect or shut it down. + + Skipped automatically when ``TESTING=true`` or ``DISABLE_SCHEDULER`` is set + (checked externally in create_app — this function unconditionally starts the + scheduler when called, caller is responsible for the guard). + """ + from apscheduler.schedulers.background import BackgroundScheduler + + scheduler = BackgroundScheduler() + + # Wrap the reminder dispatch job with retry logic + monitored_dispatch = retryable( + "reminder_dispatch", + max_retries=3, + backoff_base=2.0, + app=app, + )(_dispatch_due_reminders_job) + + scheduler.add_job( + monitored_dispatch, + trigger="interval", + minutes=1, + id="reminder_dispatch", + replace_existing=True, + args=[app], + ) + + scheduler.start() + app.extensions["job_scheduler"] = scheduler + logger.info("Job scheduler started with %d job(s)", len(scheduler.get_jobs())) + return scheduler diff --git a/packages/backend/tests/test_job_retry.py b/packages/backend/tests/test_job_retry.py new file mode 100644 index 000000000..f150f217a --- /dev/null +++ b/packages/backend/tests/test_job_retry.py @@ -0,0 +1,257 @@ +"""Tests for resilient background job retry & monitoring (issue #130).""" +import time +from datetime import datetime + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _fresh_monitor(): + """Return the global monitor after clearing its state for test isolation.""" + from app.services.job_retry import job_monitor + job_monitor.reset() + return job_monitor + + +# --------------------------------------------------------------------------- +# JobMonitor unit tests (no HTTP layer) +# --------------------------------------------------------------------------- + +def test_monitor_register_creates_record(): + monitor = _fresh_monitor() + rec = monitor.register("test_job") + assert rec.name == "test_job" + assert rec.status == "idle" + assert rec.run_count == 0 + + +def test_monitor_register_idempotent(): + monitor = _fresh_monitor() + r1 = monitor.register("same_job") + r2 = monitor.register("same_job") + assert r1 is r2 + + +def test_monitor_all_records(): + monitor = _fresh_monitor() + monitor.register("job_a") + monitor.register("job_b") + names = {r.name for r in monitor.all_records()} + assert {"job_a", "job_b"} == names + + +def test_monitor_to_dict_structure(): + monitor = _fresh_monitor() + monitor.register("j1") + d = monitor.to_dict() + assert "j1" in d + assert "status" in d["j1"] + assert "run_count" in d["j1"] + assert "last_error" in d["j1"] + + +# --------------------------------------------------------------------------- +# retryable decorator unit tests +# --------------------------------------------------------------------------- + +def test_retryable_success_on_first_attempt(): + from app.services.job_retry import retryable + monitor = _fresh_monitor() + calls = [] + + @retryable("ok_job", max_retries=2, backoff_base=0) + def ok_job(): + calls.append(1) + return "done" + + ok_job() + assert len(calls) == 1 + rec = monitor.get_record("ok_job") + assert rec.status == "success" + assert rec.success_count == 1 + assert rec.failure_count == 0 + assert rec.run_count == 1 + + +def test_retryable_retries_on_failure(): + from app.services.job_retry import retryable + _fresh_monitor() + attempts = [] + + @retryable("flaky_job", max_retries=2, backoff_base=0) + def flaky_job(): + attempts.append(1) + raise RuntimeError("transient error") + + flaky_job() + assert len(attempts) == 3 # 1 initial + 2 retries + + +def test_retryable_succeeds_on_second_attempt(): + from app.services.job_retry import retryable + monitor = _fresh_monitor() + call_count = [0] + + @retryable("recover_job", max_retries=3, backoff_base=0) + def recover_job(): + call_count[0] += 1 + if call_count[0] < 2: + raise ValueError("first attempt fails") + return "recovered" + + recover_job() + rec = monitor.get_record("recover_job") + assert rec.status == "success" + assert rec.success_count == 1 + assert rec.failure_count == 0 + + +def test_retryable_records_failure_after_all_retries(): + from app.services.job_retry import retryable + monitor = _fresh_monitor() + + @retryable("always_fail", max_retries=1, backoff_base=0) + def always_fail(): + raise RuntimeError("boom") + + always_fail() + rec = monitor.get_record("always_fail") + assert rec.status == "failed" + assert rec.failure_count == 1 + assert rec.success_count == 0 + assert "boom" in rec.last_error + + +def test_retryable_last_error_cleared_on_success(): + from app.services.job_retry import retryable + monitor = _fresh_monitor() + should_fail = [True] + + @retryable("toggle_job", max_retries=2, backoff_base=0) + def toggle_job(): + if should_fail[0]: + raise RuntimeError("error") + + toggle_job() # fails, records error + should_fail[0] = False + toggle_job() # succeeds + rec = monitor.get_record("toggle_job") + assert rec.status == "success" + assert rec.last_error is None + + +def test_retryable_timestamps_updated(): + from app.services.job_retry import retryable + monitor = _fresh_monitor() + + @retryable("ts_job", max_retries=0, backoff_base=0) + def ts_job(): + pass + + before = datetime.utcnow() + ts_job() + after = datetime.utcnow() + rec = monitor.get_record("ts_job") + assert rec.last_run is not None + assert before <= rec.last_run <= after + assert rec.last_success is not None + + +def test_retryable_run_count_increments(): + from app.services.job_retry import retryable + monitor = _fresh_monitor() + + @retryable("count_job", max_retries=0, backoff_base=0) + def count_job(): + pass + + count_job() + count_job() + count_job() + rec = monitor.get_record("count_job") + assert rec.run_count == 3 + + +# --------------------------------------------------------------------------- +# HTTP: GET /jobs/status +# --------------------------------------------------------------------------- + +def test_job_status_requires_auth(client): + r = client.get("/jobs/status") + assert r.status_code == 401 + + +def test_job_status_returns_json(client, auth_header): + r = client.get("/jobs/status", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert "job_count" in data + assert "jobs" in data + assert isinstance(data["jobs"], list) + + +def test_job_status_shows_registered_jobs(client, auth_header): + from app.services.job_retry import retryable + _fresh_monitor() + + @retryable("demo_job", max_retries=0, backoff_base=0) + def demo_job(): + pass + + demo_job() + + r = client.get("/jobs/status", headers=auth_header) + data = r.get_json() + job_names = [j["name"] for j in data["jobs"]] + assert "demo_job" in job_names + + +def test_job_status_record_structure(client, auth_header): + from app.services.job_retry import retryable + _fresh_monitor() + + @retryable("struct_job", max_retries=0, backoff_base=0) + def struct_job(): + pass + + struct_job() + + r = client.get("/jobs/status", headers=auth_header) + jobs = {j["name"]: j for j in r.get_json()["jobs"]} + assert "struct_job" in jobs + j = jobs["struct_job"] + for key in ("name", "status", "run_count", "success_count", + "failure_count", "last_run", "last_success", "last_failure", "last_error"): + assert key in j, f"Missing key: {key}" + + +# --------------------------------------------------------------------------- +# HTTP: POST /jobs/trigger/ — no scheduler running in tests +# --------------------------------------------------------------------------- + +def test_trigger_job_requires_auth(client): + r = client.post("/jobs/trigger/reminder_dispatch") + assert r.status_code == 401 + + +def test_trigger_job_503_when_no_scheduler(client, auth_header): + """In the test environment the scheduler is not started → 503.""" + r = client.post("/jobs/trigger/reminder_dispatch", headers=auth_header) + assert r.status_code == 503 + + +# --------------------------------------------------------------------------- +# Dispatch due reminders job (isolated, no scheduler needed) +# --------------------------------------------------------------------------- + +def test_dispatch_job_runs_without_error(client, auth_header, app_fixture): + """_dispatch_due_reminders_job can run inside an app context without crashing.""" + from app.services.job_retry import _dispatch_due_reminders_job + + with app_fixture.app_context(): + result = _dispatch_due_reminders_job(app_fixture) + + assert "sent" in result + assert "failed" in result + assert result["sent"] == 0 # no due reminders in fresh DB From 26c31ee9bef412a445e5e870878f83c52a412a29 Mon Sep 17 00:00:00 2001 From: Entr0zy Date: Sat, 30 May 2026 12:01:56 +0100 Subject: [PATCH 2/2] test: isolate redis in backend tests --- README.md | 2 + packages/backend/app/openapi.yaml | 88 ++++++++++++++++++++++++++++++ packages/backend/tests/conftest.py | 30 ++++++++++ 3 files changed, 120 insertions(+) diff --git a/README.md b/README.md index 49592bffc..0f48b95ca 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ OpenAPI: `backend/app/openapi.yaml` - Expenses: CRUD `/expenses` - Bills: CRUD `/bills`, pay/mark `/bills/{id}/pay` - Reminders: CRUD `/reminders`, trigger `/reminders/run` +- Jobs: status `/jobs/status`, manual trigger `/jobs/trigger/{job_id}` - Insights: `/insights/monthly`, `/insights/budget-suggestion` ## MVP UI/UX Plan @@ -173,6 +174,7 @@ finmind/ - request count by endpoint/status - request duration histograms (latency, including dashboard p95 KPI) - reminder event counters (engagement KPI) +- Background job monitoring is available at `/jobs/status`; scheduled jobs can be triggered manually with `/jobs/trigger/{job_id}`. - Logs are emitted as JSON with `request_id` and shipped to Loki via Promtail. - Pre-provisioned Grafana dashboard: `FinMind Operations and KPI`. diff --git a/packages/backend/app/openapi.yaml b/packages/backend/app/openapi.yaml index 3f8ec3f0f..7e1609a2f 100644 --- a/packages/backend/app/openapi.yaml +++ b/packages/backend/app/openapi.yaml @@ -11,6 +11,7 @@ tags: - name: Expenses - name: Bills - name: Reminders + - name: Jobs - name: Insights paths: /auth/register: @@ -385,6 +386,73 @@ paths: application/json: schema: { $ref: '#/components/schemas/Error' } + /jobs/status: + get: + summary: Get background job status + tags: [Jobs] + security: [{ bearerAuth: [] }] + responses: + '200': + description: Current execution state for registered background jobs + content: + application/json: + schema: + $ref: '#/components/schemas/JobStatus' + example: + job_count: 1 + jobs: + - name: reminder_dispatch + status: idle + run_count: 0 + success_count: 0 + failure_count: 0 + last_run: null + last_success: null + last_failure: null + last_error: null + '401': + description: Unauthorized + content: + application/json: + schema: { $ref: '#/components/schemas/Error' } + + /jobs/trigger/{jobId}: + post: + summary: Trigger a scheduled background job + tags: [Jobs] + security: [{ bearerAuth: [] }] + parameters: + - in: path + name: jobId + required: true + schema: { type: string } + responses: + '200': + description: Job trigger scheduled + content: + application/json: + schema: + type: object + properties: + message: { type: string } + example: + message: "job 'reminder_dispatch' triggered" + '401': + description: Unauthorized + content: + application/json: + schema: { $ref: '#/components/schemas/Error' } + '404': + description: Job not found + content: + application/json: + schema: { $ref: '#/components/schemas/Error' } + '503': + description: Scheduler not running + content: + application/json: + schema: { $ref: '#/components/schemas/Error' } + /reminders/bills/{billId}/schedule: post: summary: Create bill reminders with default/override offsets @@ -587,3 +655,23 @@ components: message: { type: string } send_at: { type: string, format: date-time } channel: { type: string, enum: [email, whatsapp], default: email } + JobRecord: + type: object + properties: + name: { type: string } + status: { type: string, enum: [idle, running, success, failed] } + run_count: { type: integer } + success_count: { type: integer } + failure_count: { type: integer } + last_run: { type: string, format: date-time, nullable: true } + last_success: { type: string, format: date-time, nullable: true } + last_failure: { type: string, format: date-time, nullable: true } + last_error: { type: string, nullable: true } + JobStatus: + type: object + properties: + job_count: { type: integer } + jobs: + type: array + items: + $ref: '#/components/schemas/JobRecord' diff --git a/packages/backend/tests/conftest.py b/packages/backend/tests/conftest.py index a7315b8c9..a56e6ca84 100644 --- a/packages/backend/tests/conftest.py +++ b/packages/backend/tests/conftest.py @@ -7,6 +7,36 @@ from app import models # noqa: F401 - ensure models are registered +class InMemoryRedis: + """Minimal Redis test double for auth refresh-token operations.""" + + def __init__(self): + self._store = {} + + def setex(self, key, _ttl, value): + self._store[key] = value + return True + + def get(self, key): + return self._store.get(key) + + def delete(self, key): + existed = key in self._store + self._store.pop(key, None) + return int(existed) + + def flushdb(self): + self._store.clear() + return True + + +_test_redis = InMemoryRedis() +redis_client.setex = _test_redis.setex +redis_client.get = _test_redis.get +redis_client.delete = _test_redis.delete +redis_client.flushdb = _test_redis.flushdb + + class TestSettings(Settings): # Override defaults for tests database_url: str = "sqlite+pysqlite:///:memory:"