Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions dags/dbt_orchestration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from client_list import clients
from airflow.decorators import dag, task
from airflow.models import Variable
from data_utils.github_helper import get_github_token, trigger_workflow
from data_utils.alerting.alerting import task_failed
import pendulum

# Common variables
github_token = get_github_token()
env = Variable.get("environment")
default_args = {"owner": "airflow", "start_date": pendulum.datetime(2024, 11, 11, tz="UTC")}


def create_dbt_orchestration_dag(client_name):
@dag(
dag_id=f"dbt_orchestration_{client_name}", # Ensure unique dag_id
default_args=default_args,
schedule=None,
catchup=False,
on_failure_callback=task_failed
)
def dbt_orchestration():

# Define GitHub action trigger
@task(task_id=f'trigger_github_action_{client_name}')
def trigger_github_action():
"""
Trigger a GitHub action for the client.
"""
trigger_workflow(f'{client_name}_models_run_{env}.yml', token=github_token)


# Trigger GitHub action
github_action = trigger_github_action()

github_action

return dbt_orchestration()


enabled = Variable.get("dbt_orchestration_enabled")

if enabled == "True":
# Dynamically generate DAGs for all clients
for client in clients:
globals()[f"dbt_orchestration_{client}"] = create_dbt_orchestration_dag(client_name=client)
18 changes: 18 additions & 0 deletions dags/meta_dbt_orchestration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pendulum

from client_list import clients
from data_utils.dags_utils.orchestration_utils import create_orchestration_dag

default_args = {
'owner': 'airflow',
'retries': 1,
}

dag_id = 'meta_dbt_orchestration'
description = 'Orchestrator DAG for managing client GA DAGs sequentially'
schedule_interval = None # Trigger manually
start_date = pendulum.datetime(2024, 11, 11, tz="UTC")

dbt_orchestration_dags = clients_prefixed = [f"dbt_orchestration_{client}" for client in clients]

dag = create_orchestration_dag(dag_id, description, schedule_interval, start_date, dbt_orchestration_dags)