diff --git a/.gitignore b/.gitignore index 4d34874454..e2ab9f86fb 100644 --- a/.gitignore +++ b/.gitignore @@ -7,17 +7,20 @@ /.venv/ **/*.egg-info/ __pycache__ +/all_columns.txt /assets/dist /assets/stats.html /assets/src/scripts/outputs-viewer/coverage /coverage /docker/staticfiles/* +/excluded_columns.txt /htmlcov /jobserver.dump /jobserver.sqlite.zip node_modules /releases /repos +/sanitised_jobserver.dump /staticfiles /uploads /workspaces diff --git a/docker/justfile b/docker/justfile index 535b46f857..124beba611 100644 --- a/docker/justfile +++ b/docker/justfile @@ -90,4 +90,12 @@ restore-db dump="jobserver.dump": clean-volumes db #!/bin/bash set -eux path=$(realpath ../{{ dump }}) - docker compose run -v "$path:/tmp/jobserver.dump" --entrypoint bash --rm dev -c "pg_restore --clean --if-exists --no-acl --no-owner -d \$DATABASE_URL /tmp/jobserver.dump" + docker compose run -v "$path:/tmp/jobserver.dump" --entrypoint bash --rm dev -c ' + set -eux + pg_restore --clean --if-exists --no-acl --no-owner -d "$DATABASE_URL" /tmp/jobserver.dump + has_temp=$(psql "$DATABASE_URL" -tAc "SELECT 1 FROM information_schema.schemata WHERE schema_name = '\''temp_scrubbed_schema'\''") + if [ "$has_temp" = "1" ]; then + psql "$DATABASE_URL" -v ON_ERROR_STOP=1 -c "DROP SCHEMA IF EXISTS public CASCADE" + psql "$DATABASE_URL" -v ON_ERROR_STOP=1 -c "ALTER SCHEMA temp_scrubbed_schema RENAME TO public" + fi + ' diff --git a/jobserver/jobs/yearly/allow_list.json b/jobserver/jobs/yearly/allow_list.json new file mode 100644 index 0000000000..e9263cbcbc --- /dev/null +++ b/jobserver/jobs/yearly/allow_list.json @@ -0,0 +1,574 @@ +{ + "applications_application": [ + "id", + "has_agreed_to_terms", + "created_at", + "completed_at", + "created_by_id", + "project_id", + "approved_at", + "approved_by_id", + "status", + "status_comment", + "deleted_at", + "deleted_by_id", + "submitted_at", + "submitted_by_id" + ], + "applications_cmoprioritylistpage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "is_on_cmo_priority_list", + "application_id", + "reviewed_by_id" + ], + "applications_commercialinvolvementpage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "details", + "application_id", + "reviewed_by_id" + ], + "applications_contactdetailspage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "full_name", + "job_title", + "team_name", + "organisation", + "application_id", + "reviewed_by_id" + ], + "applications_datasetspage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "needs_icnarc", + "needs_isaric", + "needs_ons_cis", + "needs_phosp", + "application_id", + "reviewed_by_id", + "needs_ukrr" + ], + "applications_legalbasispage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "legal_basis_for_accessing_data_under_dpa", + "how_is_duty_of_confidentiality_satisfied", + "application_id", + "reviewed_by_id" + ], + "applications_previousehrexperiencepage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "previous_experience_with_ehr", + "application_id", + "reviewed_by_id" + ], + "applications_recordleveldatapage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "record_level_data_reasons", + "application_id", + "reviewed_by_id" + ], + "applications_referencespage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "hra_ires_id", + "hra_rec_reference", + "institutional_rec_reference", + "application_id", + "reviewed_by_id" + ], + "applications_researcherdetailspage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "application_id", + "reviewed_by_id" + ], + "applications_researcherregistration": [ + "id", + "name", + "job_title", + "does_researcher_need_server_access", + "has_taken_safe_researcher_training", + "training_with_org", + "training_passed_at", + "created_at", + "application_id", + "user_id", + "daa", + "github_username" + ], + "applications_sharingcodepage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "evidence_of_sharing_in_public_domain_before", + "application_id", + "reviewed_by_id" + ], + "applications_shortdatareportpage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "application_id", + "reviewed_by_id" + ], + "applications_softwaredevelopmentexperiencepage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "evidence_of_coding", + "all_applicants_completed_getting_started", + "application_id", + "reviewed_by_id" + ], + "applications_sponsordetailspage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "sponsor_name", + "sponsor_job_role", + "institutional_rec_reference", + "application_id", + "reviewed_by_id", + "is_member_of_bennett_or_lshtm" + ], + "applications_studydatapage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "data_meets_purpose", + "need_record_level_data", + "application_id", + "reviewed_by_id" + ], + "applications_studyfundingpage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "funding_details", + "application_id", + "reviewed_by_id" + ], + "applications_studyinformationpage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "study_name", + "study_purpose", + "application_id", + "reviewed_by_id", + "read_analytic_methods_policy" + ], + "applications_studypurposepage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "description", + "author_name", + "author_organisation", + "application_id", + "reviewed_by_id", + "is_covid_prevention", + "is_covid_vaccine_effectiveness_or_safety", + "is_covid_vaccine_eligibility_or_coverage", + "is_other_impacts_of_covid", + "is_post_covid_health_impacts", + "is_risk_from_covid" + ], + "applications_teamdetailspage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "team_details", + "application_id", + "reviewed_by_id" + ], + "applications_typeofstudypage": [ + "id", + "notes", + "is_approved", + "last_reviewed_at", + "created_at", + "updated_at", + "is_study_research", + "is_study_service_evaluation", + "is_study_audit", + "application_id", + "reviewed_by_id", + "is_study_short_data_report" + ], + "auth_group": [ + "id", + "name" + ], + "auth_group_permissions": [ + "id", + "group_id", + "permission_id" + ], + "auth_permission": [ + "id", + "content_type_id", + "codename", + "name" + ], + "django_content_type": [ + "id", + "app_label", + "model" + ], + "django_migrations": [ + "id", + "app", + "name", + "applied" + ], + + "django_session": [ + "session_key", + "expire_date" + ], + "jobserver_auditableevent": [ + "id", + "old", + "new", + "type", + "target_model", + "target_field", + "target_id", + "target_user", + "parent_model", + "parent_id", + "created_at", + "created_by" + ], + "jobserver_backend": [ + "id", + "name", + "created_at", + "updated_at", + "is_active", + "slug", + "alert_timeout", + "last_seen_at", + "last_seen_maintenance_mode", + "rap_api_state", + "is_in_maintenance_mode" + ], + "jobserver_backendmembership": [ + "id", + "created_at", + "backend_id", + "created_by_id", + "user_id" + ], + "jobserver_job": [ + "id", + "status_message", + "created_at", + "started_at", + "completed_at", + "job_request_id", + "action", + "updated_at", + "identifier", + "status", + "status_code", + "trace_context", + "run_command", + "metrics" + ], + "jobserver_jobrequest": [ + "id", + "force_run_dependencies", + "sha", + "identifier", + "created_at", + "backend_id", + "created_by_id", + "workspace_id", + "cancelled_actions", + "will_notify", + "project_definition", + "requested_actions", + "codelists_ok", + "_status", + "status_message" + ], + "jobserver_org": [ + "id", + "name", + "slug", + "created_at", + "description", + "logo", + "created_by_id", + "github_orgs", + "logo_file" + ], + "jobserver_orgmembership": [ + "id", + "created_by_id", + "org_id", + "user_id", + "created_at" + ], + "jobserver_project": [ + "id", + "slug", + "name", + "created_at", + "created_by_id", + "copilot_id", + "copilot_support_ends_at", + "number", + "updated_at", + "updated_by_id", + "status", + "status_description", + "copilot_notes" + ], + "jobserver_projectcollaboration": [ + "id", + "is_lead", + "org_id", + "project_id", + "created_at", + "created_by_id", + "updated_at", + "updated_by_id" + ], + "jobserver_projectmembership": [ + "id", + "project_id", + "user_id", + "created_at", + "created_by_id", + "roles" + ], + "jobserver_publishrequest": [ + "id", + "created_at", + "created_by_id", + "snapshot_id", + "workspace_id", + "decision", + "decision_at", + "decision_by_id", + "updated_at", + "updated_by_id" + ], + "jobserver_release": [ + "backend_id", + "workspace_id", + "created_by_id", + "created_at", + "requested_files", + "status", + "id", + "metadata", + "review" + ], + "jobserver_releasefile": [ + "id", + "name", + "path", + "release_id", + "workspace_id", + "created_at", + "created_by_id", + "filehash", + "deleted_at", + "deleted_by_id", + "size", + "mtime", + "uploaded_at", + "metadata" + ], + "jobserver_releasefilereview": [ + "id", + "status", + "comments", + "created_at", + "created_by_id", + "release_file_id" + ], + "jobserver_repo": [ + "id", + "url", + "researcher_signed_off_at", + "researcher_signed_off_by_id", + "internal_signed_off_at", + "internal_signed_off_by_id", + "has_github_outputs" + ], + "jobserver_sitealert": [ + "id", + "title", + "message", + "level", + "created_at", + "updated_at", + "created_by_id", + "updated_by_id" + ], + "jobserver_snapshot": [ + "id", + "created_at", + "created_by_id", + "workspace_id" + ], + "jobserver_snapshot_files": [ + "id", + "snapshot_id", + "releasefile_id" + ], + "jobserver_stats": [ + "id", + "api_last_seen", + "backend_id", + "url" + ], + "jobserver_user": [ + "id", + "last_login", + "username", + "date_joined", + "fullname", + "created_by_id", + "roles" + ], + "jobserver_workspace": [ + "id", + "name", + "branch", + "is_archived", + "created_at", + "created_by_id", + "should_notify", + "project_id", + "uses_new_release_flow", + "repo_id", + "signed_off_at", + "signed_off_by_id", + "purpose", + "updated_at", + "updated_by_id" + ], + "redirects_redirect": [ + "id", + "old_url", + "expires_at", + "created_at", + "updated_at", + "deleted_at", + "created_by_id", + "deleted_by_id", + "project_id", + "workspace_id", + "org_id" + ], + "social_auth_association": [ + "id", + "server_url", + "handle", + "issued", + "lifetime", + "assoc_type" + ], + "social_auth_code": [ + "code", + "id", + "timestamp", + "verified" + ], + "social_auth_nonce": [ + "id", + "server_url", + "timestamp", + "salt" + ], + "social_auth_partial": [ + "id", + "next_step", + "backend", + "timestamp", + "data" + ], + "social_auth_usersocialauth": [ + "id", + "provider", + "user_id", + "created", + "modified", + "uid", + "extra_data" + ] +} diff --git a/jobserver/jobs/yearly/dump_sanitised_db.md b/jobserver/jobs/yearly/dump_sanitised_db.md new file mode 100644 index 0000000000..dbb8d742d4 --- /dev/null +++ b/jobserver/jobs/yearly/dump_sanitised_db.md @@ -0,0 +1,146 @@ +# Sanitised Database Dump Job + +## Content + +- [About](#about) +- [High-Level Algorithm](#high-level-algorithm) +- [Allow List](#allow-list) +- [Manually testing the script](#manually-testing-the-script) +- [Notes and Decisions](#notes-and-decisions) +- [FAQs](#faqs) + +## About + +This job exists so developers can work with a production‑like dataset without exposing sensitive information. Instead of copying the raw production database, it creates a temporary schema containing only safe columns (non‑allow‑listed columns are replaced with fake values) and dumps that schema for local use. + +The code lives in `jobserver/jobs/yearly/dump_sanitised_db.py` alongside its allow list (`allow_list.json`). The job currently inherits from `YearlyJob` so it only runs when triggered manually; once we are confident it is safe, we can switch it back to `DailyJob`. + +## High-Level Algorithm + +1. **Load configuration** + - Read `settings.DATABASES["default"]` to connect to production. + - Load `allow_list.json` (table → allowed columns). If it’s empty or missing, exit. +2. **Prepare output** + - Ensure `/storage` exists, then create a temporary file there (`tmp-sanitised-jobserver-dump-*`) so we never overwrite the existing dump until the job succeeds. +3. **Scratch schema lifecycle** + - Drop any leftover `temp_scrubbed_schema`. + - Inside a `try/finally`, create `temp_scrubbed_schema`, add a descriptive comment, and copy each allow-listed table: + * For each table, fetch the real column metadata from `information_schema`. + * Validate the table/column names (to avoid SQL injection) and create a matching table in the scratch schema using `CREATE TABLE … LIKE`. + * Build a single `INSERT … SELECT` per table. Allowed columns are copied verbatim; everything else gets a deterministic fake. + - After copying all tables, run `pg_dump --schema=temp_scrubbed_schema` into the temp file, then drop the scratch schema in the `finally` block so nothing remains on production. +4. **Publish the dump** + - Flush the temp file, `chmod 600`, and `shutil.copy2` it to `/storage/sanitised_jobserver.dump`. The temporary file is automatically deleted when the context manager exits. + +```mermaid +flowchart TD + A[Dump Sanitised Database Job] --> B[Load DB settings + allow_list.json] + B --> C[Create temp file in /storage] + C --> D[Create temp_scrubbed_schema in DB] + D --> E{For each allow-listed table} + E --> F[Fetch column metadata from information_schema] + F --> G[CREATE TABLE temp_scrubbed_schema.table LIKE public.table] + G --> H[Insert rows: allowed columns verbatim, fake others] + H --> E + E --> I[pg_dump --schema=temp_scrubbed_schema -> temp file] + I --> J[Drop temp_scrubbed_schema] + J --> K[Rename temp file to sanitised_jobserver.dump] + K --> L[Developers download dump for local use] +``` + +## Allow List + +`allow_list.json` is a manually curated mapping of tables to the columns we want to keep verbatim. Everything else gets faked. To build it we: + +1. Dumped the production schema (`schema.sql`) and annotated each table with comments about which columns might contain sensitive data. +2. For each table, either marked it safe or listed the exact columns we wanted to retain. Any column omitted from the list is automatically replaced with fake values in the scratch schema. +3. Stored the final allow list under `jobserver/jobs/yearly/allow_list.json`. The integration test `tests/integration/test_dump_sanitised_db.py` checks that every table/column in the allow list still exists in the current schema so we notice if migrations drift. + +When adding new columns/tables, update the allow list accordingly. If a column is sensitive but referenced by the UI, we keep it in the schema but set its values to fake data (so the schema stays intact). + +If you ever need to see the columns that are **not** allow‑listed, run `scripts/get_excluded_columns.py`. First generate a column list from your database and then run the script to produce `excluded_columns.txt` which highlights every table/column pair we currently fake. Refer to script's docstring for more details on how to run the script. This is a useful audit tool when reviewing allow‑list changes. + +## Manually testing the script + +We can manually test this script by following these steps: + +1. Switch to a virtual environment + +2. Copy dotenv-sample and rename copy to .env + +``` +scripts/dev-env.sh .env +``` + +3. Start docker and install requirements + +``` +just docker/db +just devenv +``` + +4. In another terminal, download and restore the latest production database dump to a separate database. We need to do this so that the sanitised db script can use this production like db when creating a sanitised dump. In real case scenario, we would not need to do this as we will be pointing the db on dokku server. + +``` +scp dokku4:/var/lib/dokku/data/storage/job-server/jobserver.dump jobserver.dump +createdb -h localhost -p 6543 -U user jobserver_prod +pg_restore -h localhost -p 6543 -U user -d jobserver_prod --no-owner --no-acl jobserver.dump +``` + +5. Update `DATABASE_URL` in `.env` to point to this db + +``` +DATABASE_URL=postgres://user:pass@localhost:6543/jobserver_prod +``` + +6. Build assets + +``` +just assets-rebuild +``` + +7. Load changes in `.env` and run the sanitised db job to create a sanitised dump. +Make sure the `OUTPUT_PATH` mentioned in the script is available on your system. + +``` +set -a; source .env; set +a +just manage runjob dump_sanitised_db +``` + +8. Restore this sanitised db + +``` +just docker/restore-db sanitised_jobserver.dump +``` + +9. Update `DATABASE_URL` in `.env` to point back to db + +``` +DATABASE_URL=postgres://user:pass@localhost:6543/jobserver +``` + +10. Load changes in `.env` and run + +``` +set -a; source .env; set +a +just manage migrate +just run +``` + +## Notes and Decisions + +- We are leaving the legacy `dump_db` raw-dump script untouched for now. Once the sanitised job proves itself we can change `dump_db` to a normal script so an unsanitised dump is still available on demand (via an explicit run) when a debugging emergency requires it. +- SQL safety: all dynamic identifiers are validated and interpolated via `psycopg.sql.Identifier`/`sql.SQL`. Values use parameterized queries. This protects us against accidental SQL injection in allow list entries. +- Fake expressions use `ROW_NUMBER()` so they remain unique within each table copy (no collisions with `random()`). +- Tests: + * Unit tests cover helper functions (`tests/unit/jobserver/jobs/test_dump_sanitised_db.py`). + * Integration tests ensure each allow-listed table/column exists and that the scrubbed schema actually contains fake values (`tests/integration/test_dump_sanitised_db.py`). +- When ready to schedule this job automatically, switch the class back to `DailyJob`, update the Sentry monitor cron string, and add a `python manage.py runjobs daily` entry in `app.json`. Until then, keep running it manually after backups and verify the sanitised dump before distributing it to developers. + +## FAQs + +**What happens if a column is renamed/removed or its type changes?** +The integration test (`tests/integration/test_dump_sanitised_db.py`) queries `information_schema` to ensure every table/column in `allow_list.json` still exists. If a column is missing, the test fails before deployment. If it slips through and the job hits a missing column or unknown type, `_build_select_expressions`/`_fake_expression` raises an error, so the job aborts instead of producing a partial dump. This forces us to update the allow list (and fake-expression logic) whenever the schema changes. + +**What if a new column is added but not added to the allow list?** +The job treats it as “sensitive” by default: it copies the column but fills it with fake values. The dump remains structurally valid, but developers won’t see real data for that column until we explicitly add it to the allow list. This is an intentional safe default. diff --git a/jobserver/jobs/yearly/dump_sanitised_db.py b/jobserver/jobs/yearly/dump_sanitised_db.py new file mode 100644 index 0000000000..5174b21703 --- /dev/null +++ b/jobserver/jobs/yearly/dump_sanitised_db.py @@ -0,0 +1,300 @@ +import json +import os +import pathlib +import shutil +import subprocess +import sys +import tempfile + +from django.conf import settings +from django.db import connection +from django_extensions.management.jobs import YearlyJob +from psycopg import sql +from sentry_sdk.crons.decorator import monitor + +from services.sentry import monitor_config + + +# Temporary schema to hold safe copies +TEMP_SCHEMA = "temp_scrubbed_schema" +OUTPUT_PATH = pathlib.Path("/storage/sanitised_jobserver.dump") +ALLOWLIST_PATH = pathlib.Path(__file__).with_name("allow_list.json") +OUT_DIR = OUTPUT_PATH.parent + + +class Job(YearlyJob): + """ + Produces a sanitised jobserver dump for local development. + Tables/columns are controlled by allow_list.json and disallowed columns are replaced with fake values. + See dump_sanitised_db.md for more information about this script. + """ + + help = "Dump a safe copy of the DB with non-allowlisted columns replaced by fake values" + + # Keeping this job unscheduled for now; once we're confident we can switch it + # to a DailyJob and update the monitor schedule. + @monitor( + monitor_slug="dump_sanitised_db", monitor_config=monitor_config("0 0 1 1 *") + ) + def execute(self): + """Create a scrubbed copy of the production DB and dump it for dev use.""" + db = settings.DATABASES["default"] + allowlist = self._load_allowlist(ALLOWLIST_PATH) + if not allowlist: + print( + "Unable to create a sanitised dump as allowlist is missing", + file=sys.stderr, + ) + sys.exit(1) + + if not OUT_DIR.is_dir(): + print(f"Unknown output directory: {OUT_DIR}", file=sys.stderr) + sys.exit(1) + + with tempfile.NamedTemporaryFile( + prefix="tmp-sanitised-jobserver-dump-", dir=str(OUT_DIR), delete=True + ) as tmp: + tmp_name = tmp.name + + # Ensure any leftover scratch schema from previous failures is removed + self._drop_temp_schema() + + try: + self._create_safe_schema_and_copy(allowlist) + self._run_pg_dump_for_schema(tmp_name, TEMP_SCHEMA, db) + finally: + self._drop_temp_schema() + + tmp.flush() + os.chmod(tmp_name, 0o600) + shutil.copy2(tmp_name, OUTPUT_PATH) + + def _load_allowlist(self, path: str | None) -> dict[str, list[str]]: + """Read allow_list.json and return a table -> allowed columns mapping.""" + if not path: + return {} + + try: + with open(path, encoding="utf-8") as file: + data = json.load(file) + allowlist_dict: dict[str, list[str]] = {} + for table_name, cols in data.items(): + if isinstance(cols, list) and cols: + allowlist_dict[str(table_name)] = [str(c) for c in cols] + return allowlist_dict + except FileNotFoundError: + print( + f"Allowlist file not found at {path}", + file=sys.stderr, + ) + return {} + except Exception as exc: + print(f"Error loading allowlist file {path}: {exc}", file=sys.stderr) + return {} + + def _fake_expression(self, table: str, col: str, meta: dict) -> str: + """Return SQL to generate a fake-but-valid value for the given column.""" + dtype = (meta.get("data_type") or "").lower() + + if "char" in dtype or "text" in dtype: + return f"'fake_{table}_{col}_' || ROW_NUMBER() OVER ()::text" + + if "integer" in dtype or "bigint" in dtype or "smallint" in dtype: + return "ROW_NUMBER() OVER ()" + + if "timestamp" in dtype or "date" in dtype: + return "now()" + + raise ValueError( + f"Unsupported data type '{dtype}' for {table}.{col}; add handling or allowlist it" + ) + + def _valid_ident(self, x: str) -> bool: + """Reject identifiers containing characters that could lead to SQL injection.""" + return bool(x) and (x.replace("_", "").isalnum()) and (not x[0].isdigit()) + + def _normalize_table_name(self, table: str) -> tuple[str, str]: + """Split dotted table names and default schema to public.""" + if "." in table: + schema_name, table_name = table.split(".", 1) + else: + schema_name = "public" + table_name = table + + if not self._valid_ident(table_name) or not self._valid_ident(schema_name): + raise ValueError(f"Invalid table name in allowlist: {table}") + + return schema_name, table_name + + def _get_column_metadata( + self, cur, schema_name: str, table_name: str + ) -> tuple[list[str], dict[str, dict[str, str]]]: + """Pull column order and metadata from information_schema for a table.""" + cur.execute( + sql.SQL( + """ + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = %s AND table_name = %s + ORDER BY ordinal_position; + """ + ), + [schema_name, table_name], + ) + rows = cur.fetchall() + if not rows: + return [], {} + + existing_cols = [row[0] for row in rows] + col_meta = {row[0]: {"data_type": row[1]} for row in rows} + return existing_cols, col_meta + + def _build_allowed_set(self, table: str, columns: list[str]) -> set[str]: + """Validate allow-listed column names before interpolating into SQL.""" + allowed_set: set[str] = set() + for col in columns: + if not self._valid_ident(col): + raise ValueError(f"Invalid column name in allowlist for {table}: {col}") + allowed_set.add(col) + return allowed_set + + def _create_safe_table(self, cur, src_table, dst_table) -> None: + """Create a destination table identical to the source.""" + create_sql = sql.SQL( + "CREATE TABLE IF NOT EXISTS {dst} (LIKE {src} INCLUDING ALL);" + ).format(dst=dst_table, src=src_table) + try: + cur.execute(create_sql) + except Exception as exc: + raise RuntimeError(f"Failed to create table {dst_table}: {exc}") + + def _build_select_expressions( + self, + table_name: str, + existing_cols: list[str], + allowed_set: set[str], + col_meta: dict[str, dict[str, str]], + ) -> tuple[list[str], str]: + """Produce SELECT expressions that copy allow-listed data and fake the rest.""" + select_exprs: list[str] = [] + missing_cols = allowed_set - set(existing_cols) + if missing_cols: + raise ValueError( + f"Allow list references missing columns for {table_name}: {sorted(missing_cols)}" + ) + for col in existing_cols: + if col in allowed_set: + select_exprs.append(f'"{col}"') + else: + expr = self._fake_expression(table_name, col, col_meta[col]) + select_exprs.append(f'{expr} AS "{col}"') + quoted_all_cols = ", ".join(f'"{c}"' for c in existing_cols) + return select_exprs, quoted_all_cols + + def _create_safe_schema_and_copy(self, allowlist: dict[str, list[str]]): + """Populate the temporary schema with scrubbed data for each table.""" + with connection.cursor() as cur: + cur.execute( + sql.SQL("DROP SCHEMA IF EXISTS {schema} CASCADE;").format( + schema=sql.Identifier(TEMP_SCHEMA) + ) + ) + cur.execute( + sql.SQL("CREATE SCHEMA {schema};").format( + schema=sql.Identifier(TEMP_SCHEMA) + ) + ) + cur.execute( + sql.SQL("COMMENT ON SCHEMA {schema} IS %s;").format( + schema=sql.Identifier(TEMP_SCHEMA) + ), + ( + "Temporary scrubbed copy of jobserver; used by dump_sanitised_db and dropped after the job finishes.", + ), + ) + + for table, columns in allowlist.items(): + if not columns: + continue + + schema_name, table_name = self._normalize_table_name(table) + existing_cols, col_meta = self._get_column_metadata( + cur, schema_name, table_name + ) + if not existing_cols: + continue + + allowed_set = self._build_allowed_set(table, columns) + original_table = sql.Identifier(schema_name, table_name) + temp_table = sql.Identifier(TEMP_SCHEMA, table_name) + + self._create_safe_table(cur, original_table, temp_table) + + select_exprs, quoted_all_cols = self._build_select_expressions( + table_name, existing_cols, allowed_set, col_meta + ) + + insert_sql = sql.SQL( + "INSERT INTO {dst} ({cols}) SELECT {select} FROM {src}" + ).format( + dst=temp_table, + cols=sql.SQL(quoted_all_cols), + select=sql.SQL(", ".join(select_exprs)), + src=original_table, + ) + cur.execute(insert_sql) + + # Keep the sequence aligned with the data we copied; otherwise, when the sanitised dump + # is restored, the sequences stay at 1 and inserts hit duplicate-key errors. + # We only run setval when the table has an integer/serial `id` column. + id_meta = col_meta.get("id") + if ( + "id" in existing_cols + and id_meta + and any( + token in id_meta["data_type"] for token in ("int", "serial") + ) + ): + cur.execute( + sql.SQL( + """ + SELECT setval( + pg_get_serial_sequence(%s, 'id'), + COALESCE((SELECT MAX(id)::bigint FROM {table}), 1), + true + ) + """ + ).format(table=temp_table), + (f"{TEMP_SCHEMA}.{table_name}",), + ) + + def _drop_temp_schema(self): + """Drop the scratch schema if it exists.""" + with connection.cursor() as cur: + cur.execute( + sql.SQL("DROP SCHEMA IF EXISTS {schema} CASCADE;").format( + schema=sql.Identifier(TEMP_SCHEMA) + ) + ) + + def _run_pg_dump_for_schema(self, outfile: str, schema: str, db: dict): + """Use pg_dump to export only the sanitised schema to the temp file.""" + conn_uri = f"postgresql://{db['USER']}@{db['HOST']}:{db['PORT']}/{db['NAME']}" + env = os.environ.copy() + if db.get("PASSWORD"): + env["PGPASSWORD"] = db["PASSWORD"] + + cmd = [ + "pg_dump", + "--format=c", + "--no-acl", + "--no-owner", + f"--file={outfile}", + f"--schema={schema}", + conn_uri, + ] + res = subprocess.run(cmd, env=env, capture_output=True, text=True) + if res.returncode != 0: + raise RuntimeError( + f"pg_dump failed: returncode={res.returncode}; stderr={res.stderr}" + ) diff --git a/scripts/get_excluded_columns.py b/scripts/get_excluded_columns.py new file mode 100644 index 0000000000..4a60676795 --- /dev/null +++ b/scripts/get_excluded_columns.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +""" +Print the columns that are excluded (i.e., faked) by dump_sanitised_db. + +Usage: + 1. Create a database and restore the latest production database dump into it. + + 2. Run the helper SQL query against your database to list all columns: + psql "$PROD_DATABASE_URL" -tAc " + SELECT table_name, column_name + FROM information_schema.columns + WHERE table_schema = 'public'; + " > all_columns.txt + + 3. Then run this script, pointing it at that column list: + python scripts/get_excluded_columns.py all_columns.txt +""" + +from __future__ import annotations + +import argparse +import json +from collections import defaultdict +from pathlib import Path + + +ALLOW_LIST_PATH = Path("jobserver/jobs/yearly/allow_list.json") + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "column_file", + type=Path, + help="Path to a text file containing 'table|column' lines.", + ) + args = parser.parse_args() + + with ALLOW_LIST_PATH.open(encoding="utf-8") as f: + allow = json.load(f) + + allowed = {table: set(cols) for table, cols in allow.items()} + excluded: dict[str, list[str]] = defaultdict(list) + + with args.column_file.open(encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + table, column = line.split("|", 1) + allowed_cols = allowed.get(table, set()) + if not allowed_cols or column not in allowed_cols: + excluded[table].append(column) + + output_path = Path("excluded_columns.txt") + with output_path.open("w", encoding="utf-8") as out: + for table in sorted(excluded): + cols = ", ".join(sorted(excluded[table])) + out.write(f"{table}: {cols}\n") + print(f"Wrote excluded column list to {output_path.resolve()}") + + +if __name__ == "__main__": + main() diff --git a/tests/integration/test_dump_sanitised_db.py b/tests/integration/test_dump_sanitised_db.py new file mode 100644 index 0000000000..9fd2201d3a --- /dev/null +++ b/tests/integration/test_dump_sanitised_db.py @@ -0,0 +1,71 @@ +import pytest +from django.contrib.auth import get_user_model +from django.db import connection + +from jobserver.jobs.yearly import dump_sanitised_db + + +@pytest.mark.django_db +def test_allowlist_tables_and_columns_exist(): + """Ensure every allow-listed table/column exists in the database.""" + + job = dump_sanitised_db.Job() + allowlist = job._load_allowlist(dump_sanitised_db.ALLOWLIST_PATH) + + with connection.cursor() as cur: + for table, columns in allowlist.items(): + schema_name, table_name = job._normalize_table_name(table) + cur.execute( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_schema = %s AND table_name = %s + """, + [schema_name, table_name], + ) + rows = cur.fetchall() + assert rows, f"Table {schema_name}.{table_name} from allow list not found" + + existing_cols = {row[0] for row in rows} + missing = [col for col in columns if col not in existing_cols] + assert not missing, ( + f"Table {schema_name}.{table_name} missing columns: {missing}" + ) + + +@pytest.mark.django_db +def test_create_safe_schema_scrubs_disallowed_columns(): + """Copying into the temp schema keeps allow-listed data and fakes everything else.""" + job = dump_sanitised_db.Job() + user_model = get_user_model() + user = user_model.objects.create( + username="testuser", + email="testuser@gmail.com", + fullname="Real Name", + password="secret", + roles=[], + ) + + allowlist = {"jobserver_user": ["id", "username", "roles", "fullname"]} + + job._drop_temp_schema() + try: + job._create_safe_schema_and_copy(allowlist) + + with connection.cursor() as cur: + cur.execute( + """ + SELECT id, username, email, password + FROM "temp_scrubbed_schema"."jobserver_user" + WHERE id = %s + """, + [user.id], + ) + row = cur.fetchone() + assert row is not None + assert row[0] == user.id + assert row[1] == user.username + assert row[2].startswith("fake_jobserver_user_email_") + assert row[3].startswith("fake_jobserver_user_password_") + finally: + job._drop_temp_schema() diff --git a/tests/unit/jobserver/jobs/__init__.py b/tests/unit/jobserver/jobs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/jobserver/jobs/test_dump_sanitised_db.py b/tests/unit/jobserver/jobs/test_dump_sanitised_db.py new file mode 100644 index 0000000000..f357ac1912 --- /dev/null +++ b/tests/unit/jobserver/jobs/test_dump_sanitised_db.py @@ -0,0 +1,175 @@ +import json + +import pytest +from django.db import connection +from psycopg import sql + +from jobserver.jobs.yearly import dump_sanitised_db + + +class StubCursor: + def __init__(self, rows): + self._rows = rows + self.executed = [] + + def execute(self, sql, params): + self.executed.append((sql, params)) + + def fetchall(self): + return self._rows + + +class RecordingCursor: + def __init__(self): + self.executed = [] + self.connection = connection + + def execute(self, sql): + self.executed.append(sql) + + +@pytest.fixture +def job(): + return dump_sanitised_db.Job() + + +def test_load_allowlist_success(tmp_path, job): + path = tmp_path / "allow_list.json" + path.write_text( + json.dumps({"users": ["id", "email"], "jobs": ["id"]}), encoding="utf-8" + ) + + allowlist = job._load_allowlist(str(path)) + + assert allowlist == {"users": ["id", "email"], "jobs": ["id"]} + + +def test_load_allowlist_missing_file_returns_empty(job, capsys): + path = "/tmp/does-not-exist.json" + + assert job._load_allowlist(path) == {} + captured = capsys.readouterr() + assert f"Allowlist file not found at {path}" in captured.err + + +def test_load_allowlist_logs_error_on_bad_json(tmp_path, job, capsys): + bad_file = tmp_path / "allow_list.json" + bad_file.write_text("{not valid json}", encoding="utf-8") + + result = job._load_allowlist(str(bad_file)) + + assert result == {} + captured = capsys.readouterr() + assert "Error loading allowlist file" in captured.err + + +def test_fake_expression_char_column(job): + expr = job._fake_expression("users", "email", {"data_type": "character varying"}) + + assert "fake_users_email" in expr + assert "ROW_NUMBER() OVER ()" in expr + + +def test_fake_expression_integer_column(job): + expr = job._fake_expression("stats", "count", {"data_type": "integer"}) + + assert "ROW_NUMBER() OVER ()" in expr + + +def test_fake_expression_timestamp_column(job): + expr = job._fake_expression("events", "created_at", {"data_type": "timestamp"}) + + assert expr == "now()" + + +def test_fake_expression_unknown_type(job): + with pytest.raises(ValueError): + job._fake_expression("invalid", "payload", {"data_type": "bytea"}) + + +def test_valid_ident(job): + assert job._valid_ident("safe_name") + assert not job._valid_ident("1starts_with_digit") + assert not job._valid_ident("danger!zone") + + +def test_normalize_table_name_with_schema(job): + schema, table = job._normalize_table_name("special.schema_table") + + assert schema == "special" + assert table == "schema_table" + + +def test_normalize_table_name_without_schema(job): + schema, table = job._normalize_table_name("plain_table") + + assert schema == "public" + assert table == "plain_table" + + +def test_normalize_table_name_rejects_invalid_identifiers(job): + with pytest.raises(ValueError): + job._normalize_table_name("bad-schema.bad$table") + + +def test_get_column_metadata_returns_list(job): + cursor = StubCursor([("id", "integer"), ("email", "character varying")]) + + cols, meta = job._get_column_metadata(cursor, "public", "users") + + assert cols == ["id", "email"] + assert meta["email"]["data_type"] == "character varying" + assert cursor.executed + + +def test_get_column_metadata_handles_missing_table(job): + cols, meta = job._get_column_metadata(StubCursor([]), "public", "nonexistent") + + assert cols == [] + assert meta == {} + + +def test_build_allowed_set_returns_unique_columns(job): + allowed = job._build_allowed_set("table", ["col_a", "col_b", "col_a"]) + + assert allowed == {"col_a", "col_b"} + + +def test_build_allowed_set_rejects_invalid_columns(job): + with pytest.raises(ValueError): + job._build_allowed_set("table", ["valid", "not-ok!"]) + + +def test_create_safe_table_builds_sql(job): + cursor = RecordingCursor() + + src = sql.Identifier("public", "users") + dst = sql.Identifier("temp", "users") + + job._create_safe_table(cursor, src, dst) + + assert cursor.executed + sql_text = cursor.executed[0].as_string(cursor.connection) + assert 'CREATE TABLE IF NOT EXISTS "temp"."users"' in sql_text + + +def test_build_select_expressions_copies_allowed(job): + cols = ["id", "email"] + allowed = {"id"} + meta = {"id": {"data_type": "integer"}, "email": {"data_type": "text"}} + + select_exprs, quoted_cols = job._build_select_expressions( + "users", cols, allowed, meta + ) + assert quoted_cols == '"id", "email"' + assert select_exprs[0] == '"id"' + assert "fake_users_email" in select_exprs[1] + + +def test_build_select_expressions_missing_allowlisted_column_raises_error(job): + cols = ["id", "email"] + allowed = {"missing"} + meta = {"id": {"data_type": "integer"}, "email": {"data_type": "text"}} + + with pytest.raises(ValueError): + job._build_select_expressions("users", cols, allowed, meta)