diff --git a/docs/operations/data_migration.md b/docs/operations/data_migration.md index 499c0840f..b64b5ea76 100644 --- a/docs/operations/data_migration.md +++ b/docs/operations/data_migration.md @@ -953,3 +953,40 @@ To create a specific portfolio: #### Step 1: Running the script ```docker-compose exec app ./manage.py patch_suborganizations``` + + +## Remove Non-whitelisted Portfolios +This script removes Portfolio entries from the database that are not part of a predefined list of allowed portfolios (`ALLOWED_PORTFOLIOS`). +It performs the following actions: +1. Prompts the user for confirmation before proceeding with deletions. +2. Updates related objects such as `DomainInformation`, `Domain`, and `DomainRequest` to set their `portfolio` field to `None` to prevent integrity errors. +3. Deletes associated objects such as `PortfolioInvitation`, `UserPortfolioPermission`, and `Suborganization`. +4. Logs a detailed summary of all cascading deletions and orphaned objects. + +### Running on sandboxes + +#### Step 1: Login to CloudFoundry +```cf login -a api.fr.cloud.gov --sso``` + +#### Step 2: SSH into your environment +```cf ssh getgov-{space}``` + +Example: `cf ssh getgov-nl` + +#### Step 3: Create a shell instance +```/tmp/lifecycle/shell``` + +#### Step 4: Running the script +To remove portfolios: +```./manage.py remove_unused_portfolios``` + +If you wish to enable debug mode for additional logging: +```./manage.py remove_unused_portfolios --debug``` + +### Running locally + +#### Step 1: Running the script +```docker-compose exec app ./manage.py remove_unused_portfolios``` + +To enable debug mode locally: +```docker-compose exec app ./manage.py remove_unused_portfolios --debug``` \ No newline at end of file diff --git a/src/registrar/management/commands/remove_unused_portfolios.py b/src/registrar/management/commands/remove_unused_portfolios.py new file mode 100644 index 000000000..4940cc16f --- /dev/null +++ b/src/registrar/management/commands/remove_unused_portfolios.py @@ -0,0 +1,238 @@ +import argparse +import logging + +from django.core.management.base import BaseCommand +from django.db import IntegrityError +from django.db import transaction +from registrar.management.commands.utility.terminal_helper import ( + TerminalColors, + TerminalHelper, +) +from registrar.models import ( + Portfolio, + DomainGroup, + DomainInformation, + DomainRequest, + PortfolioInvitation, + Suborganization, + UserPortfolioPermission, +) + +logger = logging.getLogger(__name__) + +ALLOWED_PORTFOLIOS = [ + "Department of Veterans Affairs", + "Department of the Treasury", + "National Archives and Records Administration", + "Department of Defense", + "Office of Personnel Management", + "National Aeronautics and Space Administration", + "City and County of San Francisco", + "State of Arizona, Executive Branch", + "Department of the Interior", + "Department of State", + "Department of Justice", + "Capitol Police", + "Administrative Office of the Courts", + "Supreme Court of the United States", +] + + +class Command(BaseCommand): + help = "Remove all Portfolio entries with names not in the allowed list." + + def add_arguments(self, parser): + """ + OPTIONAL ARGUMENTS: + --debug + A boolean (default to true), which activates additional print statements + """ + parser.add_argument("--debug", action=argparse.BooleanOptionalAction) + + def prompt_delete_entries(self, portfolios_to_delete, debug_on): + """Brings up a prompt in the terminal asking + if the user wishes to delete data in the + Portfolio table. If the user confirms, + deletes the data in the Portfolio table""" + + entries_to_remove_by_name = list(portfolios_to_delete.values_list("organization_name", flat=True)) + formatted_entries = "\n\t\t".join(entries_to_remove_by_name) + confirm_delete = TerminalHelper.query_yes_no( + f""" + {TerminalColors.FAIL} + WARNING: You are about to delete the following portfolios: + + {formatted_entries} + + Are you sure you want to continue?{TerminalColors.ENDC}""" + ) + if confirm_delete: + logger.info( + f"""{TerminalColors.YELLOW} + ----------Deleting entries---------- + (please wait) + {TerminalColors.ENDC}""" + ) + self.delete_entries(portfolios_to_delete, debug_on) + else: + logger.info( + f"""{TerminalColors.OKCYAN} + ----------No entries deleted---------- + (exiting script) + {TerminalColors.ENDC}""" + ) + + def delete_entries(self, portfolios_to_delete, debug_on): # noqa: C901 + # Log the number of entries being removed + count = portfolios_to_delete.count() + if count == 0: + logger.info( + f"""{TerminalColors.OKCYAN} + No entries to remove. + {TerminalColors.ENDC} + """ + ) + return + + # If debug mode is on, print out entries being removed + if debug_on: + entries_to_remove_by_name = list(portfolios_to_delete.values_list("organization_name", flat=True)) + formatted_entries = ", ".join(entries_to_remove_by_name) + logger.info( + f"""{TerminalColors.YELLOW} + Entries to be removed: {formatted_entries} + {TerminalColors.ENDC} + """ + ) + + # Check for portfolios with non-empty related objects + # (These will throw integrity errors if they are not updated) + portfolios_with_assignments = [] + for portfolio in portfolios_to_delete: + has_assignments = any( + [ + DomainGroup.objects.filter(portfolio=portfolio).exists(), + DomainInformation.objects.filter(portfolio=portfolio).exists(), + DomainRequest.objects.filter(portfolio=portfolio).exists(), + PortfolioInvitation.objects.filter(portfolio=portfolio).exists(), + Suborganization.objects.filter(portfolio=portfolio).exists(), + UserPortfolioPermission.objects.filter(portfolio=portfolio).exists(), + ] + ) + if has_assignments: + portfolios_with_assignments.append(portfolio) + + if portfolios_with_assignments: + formatted_entries = "\n\t\t".join( + f"{portfolio.organization_name}" for portfolio in portfolios_with_assignments + ) + confirm_cascade_delete = TerminalHelper.query_yes_no( + f""" + {TerminalColors.FAIL} + WARNING: these entries have related objects. + + {formatted_entries} + + Deleting them will update any associated domains / domain requests to have no portfolio + and will cascade delete any associated portfolio invitations, portfolio permissions, domain groups, + and suborganizations. Any suborganizations that get deleted will also orphan (not delete) their + associated domains / domain requests. + + Are you sure you want to continue?{TerminalColors.ENDC}""" + ) + if not confirm_cascade_delete: + logger.info( + f"""{TerminalColors.OKCYAN} + Operation canceled by the user. + {TerminalColors.ENDC} + """ + ) + return + + with transaction.atomic(): + # Try to delete the portfolios + try: + summary = [] + for portfolio in portfolios_to_delete: + portfolio_summary = [f"---- CASCADE SUMMARY for {portfolio.organization_name} -----"] + if portfolio in portfolios_with_assignments: + domain_groups = DomainGroup.objects.filter(portfolio=portfolio) + domain_informations = DomainInformation.objects.filter(portfolio=portfolio) + domain_requests = DomainRequest.objects.filter(portfolio=portfolio) + portfolio_invitations = PortfolioInvitation.objects.filter(portfolio=portfolio) + suborganizations = Suborganization.objects.filter(portfolio=portfolio) + user_permissions = UserPortfolioPermission.objects.filter(portfolio=portfolio) + + if domain_groups.exists(): + formatted_groups = "\n".join([str(group) for group in domain_groups]) + portfolio_summary.append(f"{len(domain_groups)} Deleted DomainGroups:\n{formatted_groups}") + domain_groups.delete() + + if domain_informations.exists(): + formatted_domain_infos = "\n".join([str(info) for info in domain_informations]) + portfolio_summary.append( + f"{len(domain_informations)} Orphaned DomainInformations:\n{formatted_domain_infos}" + ) + domain_informations.update(portfolio=None) + + if domain_requests.exists(): + formatted_domain_reqs = "\n".join([str(req) for req in domain_requests]) + portfolio_summary.append( + f"{len(domain_requests)} Orphaned DomainRequests:\n{formatted_domain_reqs}" + ) + domain_requests.update(portfolio=None) + + if portfolio_invitations.exists(): + formatted_portfolio_invitations = "\n".join([str(inv) for inv in portfolio_invitations]) + portfolio_summary.append( + f"{len(portfolio_invitations)} Deleted PortfolioInvitations:\n{formatted_portfolio_invitations}" # noqa + ) + portfolio_invitations.delete() + + if user_permissions.exists(): + formatted_user_list = "\n".join( + [perm.user.get_formatted_name() for perm in user_permissions] + ) + portfolio_summary.append( + f"Deleted UserPortfolioPermissions for the following users:\n{formatted_user_list}" + ) + user_permissions.delete() + + if suborganizations.exists(): + portfolio_summary.append("Cascade Deleted Suborganizations:") + for suborg in suborganizations: + DomainInformation.objects.filter(sub_organization=suborg).update(sub_organization=None) + DomainRequest.objects.filter(sub_organization=suborg).update(sub_organization=None) + portfolio_summary.append(f"{suborg.name}") + suborg.delete() + + portfolio.delete() + summary.append("\n\n".join(portfolio_summary)) + summary_string = "\n\n".join(summary) + + # Output a success message with detailed summary + logger.info( + f"""{TerminalColors.OKCYAN} + Successfully removed {count} portfolios. + + The following portfolio deletions had cascading effects; + + {summary_string} + {TerminalColors.ENDC} + """ + ) + + except IntegrityError as e: + logger.info( + f"""{TerminalColors.FAIL} + Could not delete some portfolios due to integrity constraints: + {e} + {TerminalColors.ENDC} + """ + ) + + def handle(self, *args, **options): + # Get all Portfolio entries not in the allowed portfolios list + portfolios_to_delete = Portfolio.objects.exclude(organization_name__in=ALLOWED_PORTFOLIOS) + + self.prompt_delete_entries(portfolios_to_delete, options.get("debug")) diff --git a/src/registrar/tests/test_management_scripts.py b/src/registrar/tests/test_management_scripts.py index 536d1e760..334d7d83c 100644 --- a/src/registrar/tests/test_management_scripts.py +++ b/src/registrar/tests/test_management_scripts.py @@ -3,7 +3,10 @@ from datetime import date, datetime, time from django.core.management import call_command from django.test import TestCase, override_settings +from registrar.models.domain_group import DomainGroup +from registrar.models.portfolio_invitation import PortfolioInvitation from registrar.models.senior_official import SeniorOfficial +from registrar.models.user_portfolio_permission import UserPortfolioPermission from registrar.utility.constants import BranchChoices from django.utils import timezone from django.utils.module_loading import import_string @@ -2167,3 +2170,111 @@ def test_reference_updates(self): self.assertEqual(self.domain_information_1.sub_organization, keep_org) self.assertEqual(self.domain_request_2.sub_organization, unrelated_org) self.assertEqual(self.domain_information_2.sub_organization, unrelated_org) + + +class TestRemovePortfolios(TestCase): + """Test the remove_unused_portfolios command""" + + def setUp(self): + self.user = User.objects.create(username="testuser") + + self.logger_patcher = patch("registrar.management.commands.export_tables.logger") + self.logger_mock = self.logger_patcher.start() + + # Create mock database objects + self.portfolio_ok = Portfolio.objects.create( + organization_name="Department of Veterans Affairs", creator=self.user + ) + self.unused_portfolio_with_related_objects = Portfolio.objects.create( + organization_name="Test with orphaned objects", creator=self.user + ) + self.unused_portfolio_with_suborgs = Portfolio.objects.create( + organization_name="Test with suborg", creator=self.user + ) + + # Create related objects for unused_portfolio_with_related_objects + self.domain_information = DomainInformation.objects.create( + portfolio=self.unused_portfolio_with_related_objects, creator=self.user + ) + self.domain_request = DomainRequest.objects.create( + portfolio=self.unused_portfolio_with_related_objects, creator=self.user + ) + self.inv = PortfolioInvitation.objects.create(portfolio=self.unused_portfolio_with_related_objects) + self.group = DomainGroup.objects.create( + portfolio=self.unused_portfolio_with_related_objects, name="Test Domain Group" + ) + self.perm = UserPortfolioPermission.objects.create( + portfolio=self.unused_portfolio_with_related_objects, user=self.user + ) + + # Create a suborganization and suborg related objects for unused_portfolio_with_suborgs + self.suborganization = Suborganization.objects.create( + portfolio=self.unused_portfolio_with_suborgs, name="Test Suborg" + ) + self.suborg_domain_information = DomainInformation.objects.create( + sub_organization=self.suborganization, creator=self.user + ) + + def tearDown(self): + self.logger_patcher.stop() + + @patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no") + def test_delete_unlisted_portfolios(self, mock_query_yes_no): + """Test that portfolios not on the allowed list are deleted.""" + mock_query_yes_no.return_value = True + + # Ensure all portfolios exist before running the command + self.assertEqual(Portfolio.objects.count(), 3) + + # Run the command + call_command("remove_unused_portfolios", debug=False) + + # Check that the unlisted portfolio was removed + self.assertEqual(Portfolio.objects.count(), 1) + self.assertFalse(Portfolio.objects.filter(organization_name="Test with orphaned objects").exists()) + self.assertFalse(Portfolio.objects.filter(organization_name="Test with suborg").exists()) + self.assertTrue(Portfolio.objects.filter(organization_name="Department of Veterans Affairs").exists()) + + @patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no") + def test_delete_entries_with_related_objects(self, mock_query_yes_no): + """Test deletion with related objects being handled properly.""" + mock_query_yes_no.return_value = True + + # Ensure related objects exist before running the command + self.assertEqual(DomainInformation.objects.count(), 2) + self.assertEqual(DomainRequest.objects.count(), 1) + + # Run the command + call_command("remove_unused_portfolios", debug=False) + + # Check that related objects were updated + self.assertEqual( + DomainInformation.objects.filter(portfolio=self.unused_portfolio_with_related_objects).count(), 0 + ) + self.assertEqual(DomainRequest.objects.filter(portfolio=self.unused_portfolio_with_related_objects).count(), 0) + self.assertEqual(DomainInformation.objects.filter(portfolio=None).count(), 2) + self.assertEqual(DomainRequest.objects.filter(portfolio=None).count(), 1) + + # Check that the portfolio was deleted + self.assertFalse(Portfolio.objects.filter(organization_name="Test with orphaned objects").exists()) + + @patch("registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no") + def test_delete_entries_with_suborganizations(self, mock_query_yes_no): + """Test that suborganizations and their related objects are deleted along with the portfolio.""" + mock_query_yes_no.return_value = True + + # Ensure suborganization and related objects exist before running the command + self.assertEqual(Suborganization.objects.count(), 1) + self.assertEqual(DomainInformation.objects.filter(sub_organization=self.suborganization).count(), 1) + + # Run the command + call_command("remove_unused_portfolios", debug=False) + + # Check that the suborganization was deleted + self.assertEqual(Suborganization.objects.filter(portfolio=self.unused_portfolio_with_suborgs).count(), 0) + + # Check that deletion of suborganization had cascading effects (orphaned DomainInformation) + self.assertEqual(DomainInformation.objects.filter(sub_organization=self.suborganization).count(), 0) + + # Check that the portfolio was deleted + self.assertFalse(Portfolio.objects.filter(organization_name="Test with suborg").exists())