From 250d87f595bf15ce5f1608e94b91c6825e0cced2 Mon Sep 17 00:00:00 2001 From: ailepet Date: Wed, 16 Jul 2025 16:30:58 +0200 Subject: [PATCH 1/8] wip --- dags/cdc_grist_sync.py | 26 ++++++++++++++ dags/data_utils/grist/grist_push.py | 56 +++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 dags/cdc_grist_sync.py create mode 100644 dags/data_utils/grist/grist_push.py diff --git a/dags/cdc_grist_sync.py b/dags/cdc_grist_sync.py new file mode 100644 index 0000000..191cb17 --- /dev/null +++ b/dags/cdc_grist_sync.py @@ -0,0 +1,26 @@ +import pendulum +from airflow import DAG +from airflow.operators.python import PythonOperator +from data_utils.alerting.alerting import task_failed +from data_utils.grist.grist_push import fetch_and_dump_data +from clients import clients + +connection_name="main_db_cluster_name" + +with DAG( + dag_id='cdc_grist_sync', + default_args={'owner': 'airflow'}, + schedule='45 0 * * *', + start_date=pendulum.datetime(2024, 11, 15, tz="UTC"), + catchup=False +) as dag: + + dump_grist_data = PythonOperator( + task_id='fetch_and_dump_cdc_data', + python_callable=fetch_and_dump_data, + op_args=[f"{connection_name}", clients], + dag=dag, + on_failure_callback=task_failed, + ) + + dump_grist_data \ No newline at end of file diff --git a/dags/data_utils/grist/grist_push.py b/dags/data_utils/grist/grist_push.py new file mode 100644 index 0000000..439b066 --- /dev/null +++ b/dags/data_utils/grist/grist_push.py @@ -0,0 +1,56 @@ +from airflow.hooks.base import BaseHook +from grist_api import GristDocAPI +import pandas as pd +from ..postgres_helper.postgres_helper import get_postgres_connection +from airflow.models import Variable + +# Retrieve the connection object using Airflow's BaseHook +connection = BaseHook.get_connection("grist_osp") +grist_api_key = connection.password +grist_server = connection.host +grist_cdc_doc_id = Variable.get("grist_cdc_doc_id") + +# Get api key from your Profile Settings, and run with GRIST_API_KEY= +api = GristDocAPI(grist_cdc_doc_id, server=grist_server, api_key=grist_api_key) + +table_name = "Propositions_brutes" + +def retrieve_sql_data(connection): + query = f""" + SELECT + * + FROM prod.proposals + ORDER BY id DESC + """ + df = pd.read_sql(query, connection) + connection.close() + + return df + +def dump_to_grist(df): + #record_ids = (get a list of record ids from the existing Grist table) + #api.delete_records(table_name, record_ids, chunk_size=None) + #record_dicts = df.to_dict() + #api.add_records(table_name, record_dicts, chunk_size=None) + + #new data should be list of objects with column IDs as attributes + #(e.g. namedtuple or sqlalchemy result rows) + new_data : pd.to_sql(df) + api.sync_table(table_name, new_data, key_cols="id", other_cols, grist_fetch=None, chunk_size=None, filters=None) + + #json_data = df.to_json() + #call(url="https://{grist_server}/api/docs/{grist_cdc_doc_id}/tables/{table_name}/columns?replaceall=True", json_data, method="PUT", prefix=None)[ + + +def fetch_and_dump_data(clients): + + client = "cdc" + db_name = clients[client]["postgres"]["database_name"] + + engine = get_postgres_connection("main_db_cluster_name", db_name) + connection = engine.connect() + + df = retrieve_sql_data(connection) + + dump_to_grist(df) + From 25d9cf2f498a87de33e906531952ae15629b833b Mon Sep 17 00:00:00 2001 From: ailepet Date: Wed, 16 Jul 2025 16:35:21 +0200 Subject: [PATCH 2/8] wip --- dags/data_utils/grist/grist_push.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/data_utils/grist/grist_push.py b/dags/data_utils/grist/grist_push.py index 439b066..3c26aa2 100644 --- a/dags/data_utils/grist/grist_push.py +++ b/dags/data_utils/grist/grist_push.py @@ -39,7 +39,7 @@ def dump_to_grist(df): api.sync_table(table_name, new_data, key_cols="id", other_cols, grist_fetch=None, chunk_size=None, filters=None) #json_data = df.to_json() - #call(url="https://{grist_server}/api/docs/{grist_cdc_doc_id}/tables/{table_name}/columns?replaceall=True", json_data, method="PUT", prefix=None)[ + #call(url="https://{grist_server}/api/docs/{grist_cdc_doc_id}/tables/{table_name}/records?allow_empty_require=True", json_data, method="PUT", prefix=None)[ def fetch_and_dump_data(clients): From 0939479194abb5e7e28c0f95096c553e792f196f Mon Sep 17 00:00:00 2001 From: ailepet Date: Wed, 16 Jul 2025 16:54:00 +0200 Subject: [PATCH 3/8] wip --- dags/data_utils/grist/grist_push.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/dags/data_utils/grist/grist_push.py b/dags/data_utils/grist/grist_push.py index 3c26aa2..cfa1789 100644 --- a/dags/data_utils/grist/grist_push.py +++ b/dags/data_utils/grist/grist_push.py @@ -35,8 +35,19 @@ def dump_to_grist(df): #new data should be list of objects with column IDs as attributes #(e.g. namedtuple or sqlalchemy result rows) - new_data : pd.to_sql(df) - api.sync_table(table_name, new_data, key_cols="id", other_cols, grist_fetch=None, chunk_size=None, filters=None) + new_data = pd.to_sql(df) + key_cols = [["id", "id"[,"Numeric"]]] + other_cols = [ + ["decidim_participatory_space_slug", "decidim_participatory_space_slug"[,"Text"]] + ["title", "title"[,"Text"]], + ["body", "body"[,"Text"]], + ["url", "url"[,"Text"]], + ["translated_state", "translated_state"[,"Text"]], + ["categories", "categories"[,"Text"]], + ["comments_count", "comments_count"[,"Numeric"]], + ["endorsements_count", "endorsements_count"[,"Numeric"]] + ] + api.sync_table(table_name, new_data, key_cols, other_cols, grist_fetch=None, chunk_size=None, filters=None) #json_data = df.to_json() #call(url="https://{grist_server}/api/docs/{grist_cdc_doc_id}/tables/{table_name}/records?allow_empty_require=True", json_data, method="PUT", prefix=None)[ From 5649f0d616eb0171a1c0f25736449c89569d6700 Mon Sep 17 00:00:00 2001 From: ailepet Date: Fri, 18 Jul 2025 15:09:42 +0200 Subject: [PATCH 4/8] first completed implementation --- dags/cdc_grist_sync.py | 6 +- dags/data_utils/grist/grist_push.py | 69 +++++++++---------- .../postgres_helper/postgres_helper.py | 2 - 3 files changed, 35 insertions(+), 42 deletions(-) diff --git a/dags/cdc_grist_sync.py b/dags/cdc_grist_sync.py index 191cb17..78a2663 100644 --- a/dags/cdc_grist_sync.py +++ b/dags/cdc_grist_sync.py @@ -2,7 +2,7 @@ from airflow import DAG from airflow.operators.python import PythonOperator from data_utils.alerting.alerting import task_failed -from data_utils.grist.grist_push import fetch_and_dump_data +from data_utils.grist.grist_push import fetch_and_dump_cdc_data from clients import clients connection_name="main_db_cluster_name" @@ -17,8 +17,8 @@ dump_grist_data = PythonOperator( task_id='fetch_and_dump_cdc_data', - python_callable=fetch_and_dump_data, - op_args=[f"{connection_name}", clients], + python_callable=fetch_and_dump_cdc_data, + op_args=[connection_name, clients], dag=dag, on_failure_callback=task_failed, ) diff --git a/dags/data_utils/grist/grist_push.py b/dags/data_utils/grist/grist_push.py index cfa1789..eae4cc2 100644 --- a/dags/data_utils/grist/grist_push.py +++ b/dags/data_utils/grist/grist_push.py @@ -1,6 +1,6 @@ from airflow.hooks.base import BaseHook from grist_api import GristDocAPI -import pandas as pd +from sqlalchemy import text from ..postgres_helper.postgres_helper import get_postgres_connection from airflow.models import Variable @@ -15,53 +15,48 @@ table_name = "Propositions_brutes" -def retrieve_sql_data(connection): +def retrieve_sql_data(engine): query = f""" SELECT - * + id, + decidim_participatory_space_slug, + title, + body, + url, + translated_state, + categories, + comments_count, + endorsements_count FROM prod.proposals ORDER BY id DESC """ - df = pd.read_sql(query, connection) + with engine.connect() as connection: + result = connection.execute(text(query)) + rows_to_dump = result.all() connection.close() - return df + return rows_to_dump -def dump_to_grist(df): - #record_ids = (get a list of record ids from the existing Grist table) - #api.delete_records(table_name, record_ids, chunk_size=None) - #record_dicts = df.to_dict() - #api.add_records(table_name, record_dicts, chunk_size=None) - - #new data should be list of objects with column IDs as attributes - #(e.g. namedtuple or sqlalchemy result rows) - new_data = pd.to_sql(df) - key_cols = [["id", "id"[,"Numeric"]]] +def dump_to_grist(rows_to_dump): + new_data = rows_to_dump + key_cols = [["proposal_id", "id", "Numeric"]] other_cols = [ - ["decidim_participatory_space_slug", "decidim_participatory_space_slug"[,"Text"]] - ["title", "title"[,"Text"]], - ["body", "body"[,"Text"]], - ["url", "url"[,"Text"]], - ["translated_state", "translated_state"[,"Text"]], - ["categories", "categories"[,"Text"]], - ["comments_count", "comments_count"[,"Numeric"]], - ["endorsements_count", "endorsements_count"[,"Numeric"]] + ("decidim_participatory_space_slug", "decidim_participatory_space_slug", "Text"), + ("title", "title", "Text"), + ("body", "body", "Text"), + ("url", "url", "Text"), + ("translated_state", "translated_state", "Text"), + ("category", "first_category", "Text"), + ("comments_count", "comments_count", "Numeric"), + ("endorsements_count", "endorsements_count", "Numeric"), + ("imported_at", "imported_at", "DateTime") ] - api.sync_table(table_name, new_data, key_cols, other_cols, grist_fetch=None, chunk_size=None, filters=None) - - #json_data = df.to_json() - #call(url="https://{grist_server}/api/docs/{grist_cdc_doc_id}/tables/{table_name}/records?allow_empty_require=True", json_data, method="PUT", prefix=None)[ - + api.sync_table(table_name, new_data, key_cols, other_cols, grist_fetch=None, chunk_size=200, filters=None) -def fetch_and_dump_data(clients): +def fetch_and_dump_cdc_data(clients): client = "cdc" db_name = clients[client]["postgres"]["database_name"] - - engine = get_postgres_connection("main_db_cluster_name", db_name) - connection = engine.connect() - - df = retrieve_sql_data(connection) - - dump_to_grist(df) - + engine = get_postgres_connection(connection_name, db_name) + rows_to_dump = retrieve_sql_data(engine) + dump_to_grist(rows_to_dump) \ No newline at end of file diff --git a/dags/data_utils/postgres_helper/postgres_helper.py b/dags/data_utils/postgres_helper/postgres_helper.py index 66ca8ad..8030a11 100644 --- a/dags/data_utils/postgres_helper/postgres_helper.py +++ b/dags/data_utils/postgres_helper/postgres_helper.py @@ -2,8 +2,6 @@ from airflow.hooks.base import BaseHook import logging - - def get_postgres_connection(connection_name, database): """Extracts PostgreSQL connection details from Airflow and establishes a connection.""" try: From 07202051ba624712a4754f45fc64ad8a03f4236c Mon Sep 17 00:00:00 2001 From: ailepet Date: Fri, 18 Jul 2025 16:10:34 +0200 Subject: [PATCH 5/8] fix task --- dags/data_utils/grist/grist_push.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/data_utils/grist/grist_push.py b/dags/data_utils/grist/grist_push.py index eae4cc2..e0f5d40 100644 --- a/dags/data_utils/grist/grist_push.py +++ b/dags/data_utils/grist/grist_push.py @@ -53,7 +53,7 @@ def dump_to_grist(rows_to_dump): ] api.sync_table(table_name, new_data, key_cols, other_cols, grist_fetch=None, chunk_size=200, filters=None) -def fetch_and_dump_cdc_data(clients): +def fetch_and_dump_cdc_data(connection_name, clients): client = "cdc" db_name = clients[client]["postgres"]["database_name"] From cdbe76245ccbb0cc78aa882b960cf1a8587b5556 Mon Sep 17 00:00:00 2001 From: ailepet Date: Fri, 18 Jul 2025 16:16:51 +0200 Subject: [PATCH 6/8] fix sql query --- dags/data_utils/grist/grist_push.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dags/data_utils/grist/grist_push.py b/dags/data_utils/grist/grist_push.py index e0f5d40..b7b4003 100644 --- a/dags/data_utils/grist/grist_push.py +++ b/dags/data_utils/grist/grist_push.py @@ -24,11 +24,12 @@ def retrieve_sql_data(engine): body, url, translated_state, - categories, + first_category, comments_count, - endorsements_count + endorsements_count, + CURRENT_TIMESTAMP(3) as imported_at FROM prod.proposals - ORDER BY id DESC + ORDER BY id """ with engine.connect() as connection: result = connection.execute(text(query)) From faa9c217a97112ecf0c8b94de3617cca5dd85270 Mon Sep 17 00:00:00 2001 From: ailepet Date: Fri, 18 Jul 2025 16:37:58 +0200 Subject: [PATCH 7/8] wip --- dags/data_utils/postgres_helper/postgres_helper.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dags/data_utils/postgres_helper/postgres_helper.py b/dags/data_utils/postgres_helper/postgres_helper.py index 8030a11..d2fbf79 100644 --- a/dags/data_utils/postgres_helper/postgres_helper.py +++ b/dags/data_utils/postgres_helper/postgres_helper.py @@ -8,11 +8,12 @@ def get_postgres_connection(connection_name, database): # Retrieve the connection object using Airflow's BaseHook connection = BaseHook.get_connection(connection_name) - # Extract connection details - user = connection.login - password = connection.password - host = connection.host - port = connection.port + url_object = URL.create( + user = connection.login + password = connection.password + host = connection.host + port = connection.port + ) # Create the SQLAlchemy engine return create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}") From 7c15e6e4c640894fa2ab2caf789f6143652003b2 Mon Sep 17 00:00:00 2001 From: ailepet Date: Fri, 18 Jul 2025 16:42:09 +0200 Subject: [PATCH 8/8] migrate to URL method from SQLAlchemy --- dags/data_utils/postgres_helper/postgres_helper.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/dags/data_utils/postgres_helper/postgres_helper.py b/dags/data_utils/postgres_helper/postgres_helper.py index d2fbf79..01f1533 100644 --- a/dags/data_utils/postgres_helper/postgres_helper.py +++ b/dags/data_utils/postgres_helper/postgres_helper.py @@ -1,4 +1,4 @@ -from sqlalchemy import create_engine, text +from sqlalchemy import create_engine, URL, text from airflow.hooks.base import BaseHook import logging @@ -9,14 +9,16 @@ def get_postgres_connection(connection_name, database): connection = BaseHook.get_connection(connection_name) url_object = URL.create( - user = connection.login - password = connection.password - host = connection.host - port = connection.port + "postgresql", + username = connection.login, + password = connection.password, + host = connection.host, + port = connection.port, + database = database, ) # Create the SQLAlchemy engine - return create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}") + return create_engine(url_object) except Exception as e: