diff --git a/jobs/epic-cron/config.py b/jobs/epic-cron/config.py index 9bb7245..421f0c8 100644 --- a/jobs/epic-cron/config.py +++ b/jobs/epic-cron/config.py @@ -109,6 +109,7 @@ class _Config(): # pylint: disable=too-few-public-methods ENABLE_DETAILED_LOGS = os.getenv("ENABLE_DETAILED_LOGS", "false").lower() == "true" TRACK_API_BASE_URL=os.getenv('TRACK_API_BASE_URL') + CONDITION_API_BASE_URL = os.getenv("CONDITION_API_BASE_URL") KEYCLOAK_BASE_URL = os.getenv('KEYCLOAK_BASE_URL') KEYCLOAK_REALM_NAME = os.getenv('KEYCLOAK_REALM_NAME', 'eao-epic') KEYCLOAK_SERVICE_ACCOUNT_ID = os.getenv('KEYCLOAK_SERVICE_ACCOUNT_ID') diff --git a/jobs/epic-cron/invoke_jobs.py b/jobs/epic-cron/invoke_jobs.py index 736e588..5aa0226 100644 --- a/jobs/epic-cron/invoke_jobs.py +++ b/jobs/epic-cron/invoke_jobs.py @@ -5,6 +5,7 @@ from utils.logger import setup_logging import config from tasks.project_extractor import ProjectExtractor, TargetSystem # Import the enum +from tasks.proponent_extractor import ProponentExtractor from tasks.virus_scanner import VirusScanner setup_logging(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'logging.conf')) # important to do this first @@ -40,6 +41,12 @@ def run(job_name, target_system=None, file_path=None): with application.app_context(): if job_name == 'EXTRACT_PROJECT': + # For SUBMIT, we must sync proponents first as they are dependencies + if target_system == TargetSystem.SUBMIT: + print(f'Running Proponent Extractor for {target_system.value}...') + ProponentExtractor.do_sync() + application.logger.info(f'<<<< Completed Proponent Sync for {target_system.value} >>>') + print(f'Running Project Extractor for {target_system.value}...') ProjectExtractor.do_sync(target_system=target_system) application.logger.info(f'<<<< Completed Project Sync for {target_system.value} >>>') diff --git a/jobs/epic-cron/src/epic_cron/services/approved_condition_service.py b/jobs/epic-cron/src/epic_cron/services/approved_condition_service.py new file mode 100644 index 0000000..d4f5c4a --- /dev/null +++ b/jobs/epic-cron/src/epic_cron/services/approved_condition_service.py @@ -0,0 +1,122 @@ +import requests +from flask import current_app +from submit_api.models.project import Project as SubmitProjectModel + +class ApprovedConditionService: + """Service to interact with the Condition API.""" + + @staticmethod + def sync_approved_conditions(session): + """ + Fetch project data from the Condition API and update local DB. + Also updates Proponent status to ELIGIBLE. + """ + # Get the Condition API base URL and endpoint + condition_api_base_url = current_app.config.get("CONDITION_API_BASE_URL") + # Ensure URL is available + if not condition_api_base_url: + print("WARNING: CONDITION_API_BASE_URL not configured. Skipping Condition API sync.") + return + + approved_projects_endpoint = f"{condition_api_base_url}/api/projects/with-approved-conditions" + + token = ApprovedConditionService._get_admin_token() + + if not token: + print("Failed to fetch authorization token for Condition API.") + return + + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + + print(f"Fetching projects from Condition API: {approved_projects_endpoint}") + try: + # Make the GET request to the Condition API with Authorization + response = requests.get(approved_projects_endpoint, headers=headers, timeout=30) + response.raise_for_status() + projects_data = response.json() + + print(f"Condition API returned {len(projects_data)} projects.") + + epic_guids = [p.get("epic_guid") for p in projects_data if p.get("epic_guid")] + + if not epic_guids: + return + + # Update Projects + print(f"Updating projects matching {len(epic_guids)} GUIDs...") + projects_to_update = session.query(SubmitProjectModel).filter( + SubmitProjectModel.epic_guid.in_(epic_guids) + ).all() + + proponent_ids_to_update = set() + updated_count = 0 + + for project in projects_to_update: + if not project.has_approved_condition: + project.has_approved_condition = True + updated_count += 1 + + # Collect proponent IDs for status update.. if atleast one of the project has approved condition , propoent is elible for onboarding + if project.proponent_id: + proponent_ids_to_update.add(project.proponent_id) + + print(f"Updated has_approved_condition for {updated_count} projects.") + + # Update Proponents - REMOVED + # The service now returns the list of proponent IDs to be updated by a separate task + + session.commit() + print(f"Successfully synced approved conditions. Returning {len(proponent_ids_to_update)} proponent IDs.") + + return proponent_ids_to_update + + except requests.RequestException as e: + print(f"Error while calling Condition API: {e}") + return set() + except Exception as e: + session.rollback() + print(f"Unexpected error calling Condition API: {e}") + raise e + + @staticmethod + def _get_admin_token(): + """ + Fetch an admin token using client credentials from Keycloak. + """ + # Get Keycloak configuration from Flask app config + config = current_app.config + base_url = config.get("KEYCLOAK_BASE_URL") + realm = config.get("KEYCLOAK_REALM_NAME") + admin_client_id = config.get("KEYCLOAK_SERVICE_ACCOUNT_ID") + admin_secret = config.get("KEYCLOAK_SERVICE_ACCOUNT_SECRET") + timeout = config.get("CONNECT_TIMEOUT", 60) + + if not base_url or not admin_client_id: + print("Keycloak configuration missing.") + return None + + # Construct token URL and headers + token_url = f"{base_url}/auth/realms/{realm}/protocol/openid-connect/token" + + headers = { + "Content-Type": "application/x-www-form-urlencoded" + } + + # Request body for client credentials grant + data = f"client_id={admin_client_id}&grant_type=client_credentials&client_secret={admin_secret}" + + try: + # print(f"Fetching Keycloak token from: {token_url}") + response = requests.post(token_url, data=data, headers=headers, timeout=timeout) + response.raise_for_status() + + # Parse and return the access token + access_token = response.json().get("access_token") + return access_token + + except requests.RequestException as e: + print(f"Error while fetching Keycloak token: {e}") + return None diff --git a/jobs/epic-cron/src/epic_cron/services/track_service.py b/jobs/epic-cron/src/epic_cron/services/track_service.py index f604ca7..397eb9f 100644 --- a/jobs/epic-cron/src/epic_cron/services/track_service.py +++ b/jobs/epic-cron/src/epic_cron/services/track_service.py @@ -8,6 +8,28 @@ class TrackService: """Service to interact with the Track API.""" + @staticmethod + def fetch_proponents(): + """Fetch and log unique proponents from the track.proponents table.""" + print("Fetching proponents from track database...") + + track_session = init_db(current_app) + with track_session() as session: + track_metadata = MetaData() + track_proponents_table = Table('proponents', track_metadata, autoload_with=session.bind) + + print("Selecting all proponents...") + query = select(track_proponents_table.c.id, track_proponents_table.c.name) + proponents_data = session.execute(query).fetchall() + print(f"Number of rows fetched from track.proponents: {len(proponents_data)}") + + debug_logs_enabled = current_app.config.get("ENABLE_DETAILED_LOGS", False) + if debug_logs_enabled: + for row in proponents_data: + print(f"Fetched proponent: {dict(row._mapping)}") + + return proponents_data + @staticmethod def fetch_track_data(): """Fetch and log data from the track.projects table, joining with proponents.""" diff --git a/jobs/epic-cron/tasks/project_extractor.py b/jobs/epic-cron/tasks/project_extractor.py index da14635..33b1657 100644 --- a/jobs/epic-cron/tasks/project_extractor.py +++ b/jobs/epic-cron/tasks/project_extractor.py @@ -9,6 +9,7 @@ from epic_cron.models.db import init_db, init_submit_db, init_compliance_db, \ init_conditions_db # Function that initializes DB engines from epic_cron.services.track_service import TrackService +from tasks.proponent_status_updater import ProponentStatusUpdater class TargetSystem(Enum): @@ -38,6 +39,10 @@ def do_sync(cls, target_system=TargetSystem.SUBMIT): # Step 3: Insert new records into the target database cls._insert_into_target_db(track_data, target_session, target_model, target_system) + # Step 4: Proponent Status Update (Only for SUBMIT) + if target_system == TargetSystem.SUBMIT: + ProponentStatusUpdater.update(target_session, target_model) + print(f"Project Extractor for {target_system.value} completed at {datetime.now()}") @staticmethod @@ -132,7 +137,6 @@ def _insert_into_target_db(track_data, target_session, target_model, target_syst name=project_dict['name'], epic_guid=project_dict.get("epic_guid"), proponent_id=project_dict.get("proponent_id"), - proponent_name=project_dict.get("proponent_name"), ea_certificate=project_dict.get("ea_certificate") ) elif target_system == TargetSystem.CONDITIONS: diff --git a/jobs/epic-cron/tasks/proponent_extractor.py b/jobs/epic-cron/tasks/proponent_extractor.py new file mode 100644 index 0000000..94fd2bb --- /dev/null +++ b/jobs/epic-cron/tasks/proponent_extractor.py @@ -0,0 +1,83 @@ +from datetime import datetime +from flask import current_app +from submit_api.models.proponent import Proponent as SubmitProponentModel +from epic_cron.models.db import init_submit_db +from epic_cron.services.track_service import TrackService + + +class ProponentExtractor: + """Task to run EpicTrack Proponent Extraction.""" + + @classmethod + def do_sync(cls): + """Perform the syncing.""" + print(f"Starting Proponent Extractor at {datetime.now()}") + + # Initialize target database session + print("Initializing database sessions...") + target_session = init_submit_db(current_app) + + proponents_data = TrackService.fetch_proponents() + cls._sync_proponents(proponents_data, target_session, SubmitProponentModel) + + print(f"Proponent Extractor completed at {datetime.now()}") + + @staticmethod + def _sync_proponents(track_proponents, target_session, target_model): + """ + Synchronizes proponents between Source (Track) and Target (Submit). + Strategy: Upsert (Insert/Update) + Soft Delete. + """ + print(f"Syncing proponents into the SUBMIT database...") + + with target_session() as session: + try: + # Load all existing proponents to minimize DB round-trips + existing_proponents = {p.id: p for p in session.query(target_model).all()} + + source_ids = set() + count_updated = 0 + count_created = 0 + + # 1. Update or Create Proponents from Source + for row in track_proponents: + data = dict(row._mapping) + proponent_id = data["id"] + proponent_name = data["name"] + + source_ids.add(proponent_id) + + existing_record = existing_proponents.get(proponent_id) + + if existing_record: + # TODO are there more different updates to proponens in track?? + # Update if name changed or if it was previously deleted + if existing_record.name != proponent_name or existing_record.is_deleted: + existing_record.name = proponent_name + existing_record.is_deleted = False + count_updated += 1 + else: + new_proponent = target_model( + id=proponent_id, + name=proponent_name, + is_deleted=False + ) + session.add(new_proponent) + count_created += 1 + + # 2. Soft Delete Proponents missing from track ..if track deleted some proponents + count_deleted = 0 + for proponent_id, proponent in existing_proponents.items(): + if proponent_id not in source_ids and not proponent.is_deleted: + print(f"Soft-deleting Proponent ID {proponent_id} (Missing in source)") + proponent.is_deleted = True + count_deleted += 1 + + session.commit() + + print(f"Sync Complete: {count_created} Created, {count_updated} Updated, {count_deleted} Soft-Deleted.") + + except Exception as e: + session.rollback() + print(f"*** ERROR SYNCING PROPONENTS: {e} ***") + raise e diff --git a/jobs/epic-cron/tasks/proponent_status_updater.py b/jobs/epic-cron/tasks/proponent_status_updater.py new file mode 100644 index 0000000..d077c82 --- /dev/null +++ b/jobs/epic-cron/tasks/proponent_status_updater.py @@ -0,0 +1,31 @@ +from submit_api.models.proponent import Proponent as SubmitProponentModel +from submit_api.enums.proponent_status import ProponentStatus +from epic_cron.services.approved_condition_service import ApprovedConditionService + +class ProponentStatusUpdater: + @classmethod + def update(cls, session_maker, _=None): + print("Running ProponentStatusUpdater...") + cls._update_by_conditions(session_maker) + + # Add other status checks here in the future (e.g., financial checks) + + @classmethod + def _update_by_conditions(cls, session_maker): + try: + with session_maker() as session: + ids = ApprovedConditionService.sync_approved_conditions(session) + + if not ids: + return + + print(f"Updating {len(ids)} proponents to ELIGIBLE") + proponents = session.query(SubmitProponentModel).filter( + SubmitProponentModel.id.in_(ids) + ).all() + + for proponent in proponents: + proponent.status = ProponentStatus.ELIGIBLE + session.commit() + except Exception as e: + print(f"Error updating based on conditions: {e}")