Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jobs/epic-cron/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class _Config(): # pylint: disable=too-few-public-methods
ENABLE_DETAILED_LOGS = os.getenv("ENABLE_DETAILED_LOGS", "false").lower() == "true"

TRACK_API_BASE_URL=os.getenv('TRACK_API_BASE_URL')
CONDITION_API_BASE_URL = os.getenv("CONDITION_API_BASE_URL")
KEYCLOAK_BASE_URL = os.getenv('KEYCLOAK_BASE_URL')
KEYCLOAK_REALM_NAME = os.getenv('KEYCLOAK_REALM_NAME', 'eao-epic')
KEYCLOAK_SERVICE_ACCOUNT_ID = os.getenv('KEYCLOAK_SERVICE_ACCOUNT_ID')
Expand Down
7 changes: 7 additions & 0 deletions jobs/epic-cron/invoke_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from utils.logger import setup_logging
import config
from tasks.project_extractor import ProjectExtractor, TargetSystem # Import the enum
from tasks.proponent_extractor import ProponentExtractor
from tasks.virus_scanner import VirusScanner

setup_logging(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'logging.conf')) # important to do this first
Expand Down Expand Up @@ -40,6 +41,12 @@ def run(job_name, target_system=None, file_path=None):

with application.app_context():
if job_name == 'EXTRACT_PROJECT':
# For SUBMIT, we must sync proponents first as they are dependencies
if target_system == TargetSystem.SUBMIT:
print(f'Running Proponent Extractor for {target_system.value}...')
ProponentExtractor.do_sync()
application.logger.info(f'<<<< Completed Proponent Sync for {target_system.value} >>>')

print(f'Running Project Extractor for {target_system.value}...')
ProjectExtractor.do_sync(target_system=target_system)
application.logger.info(f'<<<< Completed Project Sync for {target_system.value} >>>')
Expand Down
122 changes: 122 additions & 0 deletions jobs/epic-cron/src/epic_cron/services/approved_condition_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import requests
from flask import current_app
from submit_api.models.project import Project as SubmitProjectModel

class ApprovedConditionService:
"""Service to interact with the Condition API."""

@staticmethod
def sync_approved_conditions(session):
"""
Fetch project data from the Condition API and update local DB.
Also updates Proponent status to ELIGIBLE.
"""
# Get the Condition API base URL and endpoint
condition_api_base_url = current_app.config.get("CONDITION_API_BASE_URL")
# Ensure URL is available
if not condition_api_base_url:
print("WARNING: CONDITION_API_BASE_URL not configured. Skipping Condition API sync.")
return

approved_projects_endpoint = f"{condition_api_base_url}/api/projects/with-approved-conditions"

token = ApprovedConditionService._get_admin_token()

if not token:
print("Failed to fetch authorization token for Condition API.")
return

headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}

print(f"Fetching projects from Condition API: {approved_projects_endpoint}")
try:
# Make the GET request to the Condition API with Authorization
response = requests.get(approved_projects_endpoint, headers=headers, timeout=30)
response.raise_for_status()
projects_data = response.json()

print(f"Condition API returned {len(projects_data)} projects.")

epic_guids = [p.get("epic_guid") for p in projects_data if p.get("epic_guid")]

if not epic_guids:
return

# Update Projects
print(f"Updating projects matching {len(epic_guids)} GUIDs...")
projects_to_update = session.query(SubmitProjectModel).filter(
SubmitProjectModel.epic_guid.in_(epic_guids)
).all()

proponent_ids_to_update = set()
updated_count = 0

for project in projects_to_update:
if not project.has_approved_condition:
project.has_approved_condition = True
updated_count += 1

# Collect proponent IDs for status update.. if atleast one of the project has approved condition , propoent is elible for onboarding
if project.proponent_id:
proponent_ids_to_update.add(project.proponent_id)

print(f"Updated has_approved_condition for {updated_count} projects.")

# Update Proponents - REMOVED
# The service now returns the list of proponent IDs to be updated by a separate task

session.commit()
print(f"Successfully synced approved conditions. Returning {len(proponent_ids_to_update)} proponent IDs.")

return proponent_ids_to_update

except requests.RequestException as e:
print(f"Error while calling Condition API: {e}")
return set()
except Exception as e:
session.rollback()
print(f"Unexpected error calling Condition API: {e}")
raise e

@staticmethod
def _get_admin_token():
"""
Fetch an admin token using client credentials from Keycloak.
"""
# Get Keycloak configuration from Flask app config
config = current_app.config
base_url = config.get("KEYCLOAK_BASE_URL")
realm = config.get("KEYCLOAK_REALM_NAME")
admin_client_id = config.get("KEYCLOAK_SERVICE_ACCOUNT_ID")
admin_secret = config.get("KEYCLOAK_SERVICE_ACCOUNT_SECRET")
timeout = config.get("CONNECT_TIMEOUT", 60)

if not base_url or not admin_client_id:
print("Keycloak configuration missing.")
return None

# Construct token URL and headers
token_url = f"{base_url}/auth/realms/{realm}/protocol/openid-connect/token"

headers = {
"Content-Type": "application/x-www-form-urlencoded"
}

# Request body for client credentials grant
data = f"client_id={admin_client_id}&grant_type=client_credentials&client_secret={admin_secret}"

try:
# print(f"Fetching Keycloak token from: {token_url}")
response = requests.post(token_url, data=data, headers=headers, timeout=timeout)
response.raise_for_status()

# Parse and return the access token
access_token = response.json().get("access_token")
return access_token

except requests.RequestException as e:
print(f"Error while fetching Keycloak token: {e}")
return None
22 changes: 22 additions & 0 deletions jobs/epic-cron/src/epic_cron/services/track_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,28 @@
class TrackService:
"""Service to interact with the Track API."""

@staticmethod
def fetch_proponents():
"""Fetch and log unique proponents from the track.proponents table."""
print("Fetching proponents from track database...")

track_session = init_db(current_app)
with track_session() as session:
track_metadata = MetaData()
track_proponents_table = Table('proponents', track_metadata, autoload_with=session.bind)

print("Selecting all proponents...")
query = select(track_proponents_table.c.id, track_proponents_table.c.name)
proponents_data = session.execute(query).fetchall()
print(f"Number of rows fetched from track.proponents: {len(proponents_data)}")

debug_logs_enabled = current_app.config.get("ENABLE_DETAILED_LOGS", False)
if debug_logs_enabled:
for row in proponents_data:
print(f"Fetched proponent: {dict(row._mapping)}")

return proponents_data

@staticmethod
def fetch_track_data():
"""Fetch and log data from the track.projects table, joining with proponents."""
Expand Down
6 changes: 5 additions & 1 deletion jobs/epic-cron/tasks/project_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from epic_cron.models.db import init_db, init_submit_db, init_compliance_db, \
init_conditions_db # Function that initializes DB engines
from epic_cron.services.track_service import TrackService
from tasks.proponent_status_updater import ProponentStatusUpdater


class TargetSystem(Enum):
Expand Down Expand Up @@ -38,6 +39,10 @@ def do_sync(cls, target_system=TargetSystem.SUBMIT):
# Step 3: Insert new records into the target database
cls._insert_into_target_db(track_data, target_session, target_model, target_system)

# Step 4: Proponent Status Update (Only for SUBMIT)
if target_system == TargetSystem.SUBMIT:
ProponentStatusUpdater.update(target_session, target_model)

print(f"Project Extractor for {target_system.value} completed at {datetime.now()}")

@staticmethod
Expand Down Expand Up @@ -132,7 +137,6 @@ def _insert_into_target_db(track_data, target_session, target_model, target_syst
name=project_dict['name'],
epic_guid=project_dict.get("epic_guid"),
proponent_id=project_dict.get("proponent_id"),
proponent_name=project_dict.get("proponent_name"),
ea_certificate=project_dict.get("ea_certificate")
)
elif target_system == TargetSystem.CONDITIONS:
Expand Down
83 changes: 83 additions & 0 deletions jobs/epic-cron/tasks/proponent_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from datetime import datetime
from flask import current_app
from submit_api.models.proponent import Proponent as SubmitProponentModel
from epic_cron.models.db import init_submit_db
from epic_cron.services.track_service import TrackService


class ProponentExtractor:
"""Task to run EpicTrack Proponent Extraction."""

@classmethod
def do_sync(cls):
"""Perform the syncing."""
print(f"Starting Proponent Extractor at {datetime.now()}")

# Initialize target database session
print("Initializing database sessions...")
target_session = init_submit_db(current_app)

proponents_data = TrackService.fetch_proponents()
cls._sync_proponents(proponents_data, target_session, SubmitProponentModel)

print(f"Proponent Extractor completed at {datetime.now()}")

@staticmethod
def _sync_proponents(track_proponents, target_session, target_model):
"""
Synchronizes proponents between Source (Track) and Target (Submit).
Strategy: Upsert (Insert/Update) + Soft Delete.
"""
print(f"Syncing proponents into the SUBMIT database...")

with target_session() as session:
try:
# Load all existing proponents to minimize DB round-trips
existing_proponents = {p.id: p for p in session.query(target_model).all()}

source_ids = set()
count_updated = 0
count_created = 0

# 1. Update or Create Proponents from Source
for row in track_proponents:
data = dict(row._mapping)
proponent_id = data["id"]
proponent_name = data["name"]

source_ids.add(proponent_id)

existing_record = existing_proponents.get(proponent_id)

if existing_record:
# TODO are there more different updates to proponens in track??
# Update if name changed or if it was previously deleted
if existing_record.name != proponent_name or existing_record.is_deleted:
existing_record.name = proponent_name
existing_record.is_deleted = False
count_updated += 1
else:
new_proponent = target_model(
id=proponent_id,
name=proponent_name,
is_deleted=False
)
session.add(new_proponent)
count_created += 1

# 2. Soft Delete Proponents missing from track ..if track deleted some proponents
count_deleted = 0
for proponent_id, proponent in existing_proponents.items():
if proponent_id not in source_ids and not proponent.is_deleted:
print(f"Soft-deleting Proponent ID {proponent_id} (Missing in source)")
proponent.is_deleted = True
count_deleted += 1

session.commit()

print(f"Sync Complete: {count_created} Created, {count_updated} Updated, {count_deleted} Soft-Deleted.")

except Exception as e:
session.rollback()
print(f"*** ERROR SYNCING PROPONENTS: {e} ***")
raise e
31 changes: 31 additions & 0 deletions jobs/epic-cron/tasks/proponent_status_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from submit_api.models.proponent import Proponent as SubmitProponentModel
from submit_api.enums.proponent_status import ProponentStatus
from epic_cron.services.approved_condition_service import ApprovedConditionService

class ProponentStatusUpdater:
@classmethod
def update(cls, session_maker, _=None):
print("Running ProponentStatusUpdater...")
cls._update_by_conditions(session_maker)

# Add other status checks here in the future (e.g., financial checks)

@classmethod
def _update_by_conditions(cls, session_maker):
try:
with session_maker() as session:
ids = ApprovedConditionService.sync_approved_conditions(session)

if not ids:
return

print(f"Updating {len(ids)} proponents to ELIGIBLE")
proponents = session.query(SubmitProponentModel).filter(
SubmitProponentModel.id.in_(ids)
).all()

for proponent in proponents:
proponent.status = ProponentStatus.ELIGIBLE
session.commit()
except Exception as e:
print(f"Error updating based on conditions: {e}")
Loading