diff --git a/dags/dbt_orchestration.py b/dags/dbt_orchestration.py new file mode 100644 index 0000000..5c217c6 --- /dev/null +++ b/dags/dbt_orchestration.py @@ -0,0 +1,46 @@ +from client_list import clients +from airflow.decorators import dag, task +from airflow.models import Variable +from data_utils.github_helper import get_github_token, trigger_workflow +from data_utils.alerting.alerting import task_failed +import pendulum + +# Common variables +github_token = get_github_token() +env = Variable.get("environment") +default_args = {"owner": "airflow", "start_date": pendulum.datetime(2024, 11, 11, tz="UTC")} + + +def create_dbt_orchestration_dag(client_name): + @dag( + dag_id=f"dbt_orchestration_{client_name}", # Ensure unique dag_id + default_args=default_args, + schedule=None, + catchup=False, + on_failure_callback=task_failed + ) + def dbt_orchestration(): + + # Define GitHub action trigger + @task(task_id=f'trigger_github_action_{client_name}') + def trigger_github_action(): + """ + Trigger a GitHub action for the client. + """ + trigger_workflow(f'{client_name}_models_run_{env}.yml', token=github_token) + + + # Trigger GitHub action + github_action = trigger_github_action() + + github_action + + return dbt_orchestration() + + +enabled = Variable.get("dbt_orchestration_enabled") + +if enabled == "True": + # Dynamically generate DAGs for all clients + for client in clients: + globals()[f"dbt_orchestration_{client}"] = create_dbt_orchestration_dag(client_name=client) diff --git a/dags/meta_dbt_orchestration.py b/dags/meta_dbt_orchestration.py new file mode 100644 index 0000000..127481f --- /dev/null +++ b/dags/meta_dbt_orchestration.py @@ -0,0 +1,18 @@ +import pendulum + +from client_list import clients +from data_utils.dags_utils.orchestration_utils import create_orchestration_dag + +default_args = { + 'owner': 'airflow', + 'retries': 1, +} + +dag_id = 'meta_dbt_orchestration' +description = 'Orchestrator DAG for managing client GA DAGs sequentially' +schedule_interval = None # Trigger manually +start_date = pendulum.datetime(2024, 11, 11, tz="UTC") + +dbt_orchestration_dags = clients_prefixed = [f"dbt_orchestration_{client}" for client in clients] + +dag = create_orchestration_dag(dag_id, description, schedule_interval, start_date, dbt_orchestration_dags)