diff --git a/dags/cdc_grist_sync.py b/dags/cdc_grist_sync.py new file mode 100644 index 0000000..78a2663 --- /dev/null +++ b/dags/cdc_grist_sync.py @@ -0,0 +1,26 @@ +import pendulum +from airflow import DAG +from airflow.operators.python import PythonOperator +from data_utils.alerting.alerting import task_failed +from data_utils.grist.grist_push import fetch_and_dump_cdc_data +from clients import clients + +connection_name="main_db_cluster_name" + +with DAG( + dag_id='cdc_grist_sync', + default_args={'owner': 'airflow'}, + schedule='45 0 * * *', + start_date=pendulum.datetime(2024, 11, 15, tz="UTC"), + catchup=False +) as dag: + + dump_grist_data = PythonOperator( + task_id='fetch_and_dump_cdc_data', + python_callable=fetch_and_dump_cdc_data, + op_args=[connection_name, clients], + dag=dag, + on_failure_callback=task_failed, + ) + + dump_grist_data \ No newline at end of file diff --git a/dags/data_utils/grist/grist_push.py b/dags/data_utils/grist/grist_push.py new file mode 100644 index 0000000..b7b4003 --- /dev/null +++ b/dags/data_utils/grist/grist_push.py @@ -0,0 +1,63 @@ +from airflow.hooks.base import BaseHook +from grist_api import GristDocAPI +from sqlalchemy import text +from ..postgres_helper.postgres_helper import get_postgres_connection +from airflow.models import Variable + +# Retrieve the connection object using Airflow's BaseHook +connection = BaseHook.get_connection("grist_osp") +grist_api_key = connection.password +grist_server = connection.host +grist_cdc_doc_id = Variable.get("grist_cdc_doc_id") + +# Get api key from your Profile Settings, and run with GRIST_API_KEY= +api = GristDocAPI(grist_cdc_doc_id, server=grist_server, api_key=grist_api_key) + +table_name = "Propositions_brutes" + +def retrieve_sql_data(engine): + query = f""" + SELECT + id, + decidim_participatory_space_slug, + title, + body, + url, + translated_state, + first_category, + comments_count, + endorsements_count, + CURRENT_TIMESTAMP(3) as imported_at + FROM prod.proposals + ORDER BY id + """ + with engine.connect() as connection: + result = connection.execute(text(query)) + rows_to_dump = result.all() + connection.close() + + return rows_to_dump + +def dump_to_grist(rows_to_dump): + new_data = rows_to_dump + key_cols = [["proposal_id", "id", "Numeric"]] + other_cols = [ + ("decidim_participatory_space_slug", "decidim_participatory_space_slug", "Text"), + ("title", "title", "Text"), + ("body", "body", "Text"), + ("url", "url", "Text"), + ("translated_state", "translated_state", "Text"), + ("category", "first_category", "Text"), + ("comments_count", "comments_count", "Numeric"), + ("endorsements_count", "endorsements_count", "Numeric"), + ("imported_at", "imported_at", "DateTime") + ] + api.sync_table(table_name, new_data, key_cols, other_cols, grist_fetch=None, chunk_size=200, filters=None) + +def fetch_and_dump_cdc_data(connection_name, clients): + + client = "cdc" + db_name = clients[client]["postgres"]["database_name"] + engine = get_postgres_connection(connection_name, db_name) + rows_to_dump = retrieve_sql_data(engine) + dump_to_grist(rows_to_dump) \ No newline at end of file diff --git a/dags/data_utils/postgres_helper/postgres_helper.py b/dags/data_utils/postgres_helper/postgres_helper.py index 66ca8ad..01f1533 100644 --- a/dags/data_utils/postgres_helper/postgres_helper.py +++ b/dags/data_utils/postgres_helper/postgres_helper.py @@ -1,23 +1,24 @@ -from sqlalchemy import create_engine, text +from sqlalchemy import create_engine, URL, text from airflow.hooks.base import BaseHook import logging - - def get_postgres_connection(connection_name, database): """Extracts PostgreSQL connection details from Airflow and establishes a connection.""" try: # Retrieve the connection object using Airflow's BaseHook connection = BaseHook.get_connection(connection_name) - # Extract connection details - user = connection.login - password = connection.password - host = connection.host - port = connection.port + url_object = URL.create( + "postgresql", + username = connection.login, + password = connection.password, + host = connection.host, + port = connection.port, + database = database, + ) # Create the SQLAlchemy engine - return create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}") + return create_engine(url_object) except Exception as e: