diff --git a/dags/crossclient_aggregation.py b/dags/crossclient_aggregation.py index fdb9fac..6ccb08f 100644 --- a/dags/crossclient_aggregation.py +++ b/dags/crossclient_aggregation.py @@ -4,8 +4,10 @@ from data_utils.alerting.alerting import task_failed from clients import clients from data_utils.crossclient_aggregation.crossclient_pull import create_aggregated_tables +from data_utils.grist.grist_pull_all_clients import fetch_and_dump_data import logging +connection_name="main_db_cluster_name" queries = { "all_users": """SELECT id AS decidim_user_id, email, date_of_birth, gender, created_at, sign_in_count, current_sign_in_at, confirmed, managed, admin, deleted_at, blocked, spam, spam_reported_at, spam_probability FROM prod.all_users""", @@ -58,7 +60,7 @@ default_args={'owner': 'airflow'}, schedule='45 21 * * *', start_date=pendulum.datetime(2025, 6, 17, tz="UTC"), - catchup=True + catchup=False ) as dag: aggregate_crossclient_data = PythonOperator( @@ -69,9 +71,18 @@ on_failure_callback=task_failed, ) + fetch_grist_data = PythonOperator( + task_id='fetch_and_dump_grist_data', + python_callable=fetch_and_dump_data, + op_args=[f"{connection_name}"], + dag=dag, + on_failure_callback=task_failed, + ) + logger = logging.getLogger(__name__) logger.warn(":DEBUG: crossclient_aggregation - This is a log message") logger.warn(f":DEBUG: crossclient_aggregation> Queries : {queries}") logger.warn(f":DEBUG: crossclient_aggregation> Clients : {clients}") aggregate_crossclient_data - logger.warn(f":DEBUG: crossclient_aggregation> DAG terminated.") + fetch_grist_data + logger.warn(f":DEBUG: crossclient_aggregation> DAG terminated.") \ No newline at end of file diff --git a/dags/data_utils/grist/grist_pull.py b/dags/data_utils/grist/grist_pull.py index c335874..3fabf42 100644 --- a/dags/data_utils/grist/grist_pull.py +++ b/dags/data_utils/grist/grist_pull.py @@ -36,4 +36,4 @@ def fetch_and_dump_data(connection_name): drop_table_in_postgres(connection, table_name) dump_data_to_postgres(connection, df, table_name) - connection.close() + connection.close() \ No newline at end of file diff --git a/dags/data_utils/grist/grist_pull_all_clients.py b/dags/data_utils/grist/grist_pull_all_clients.py new file mode 100644 index 0000000..e0b252c --- /dev/null +++ b/dags/data_utils/grist/grist_pull_all_clients.py @@ -0,0 +1,39 @@ +from airflow.hooks.base import BaseHook +from grist_api import GristDocAPI +import pandas as pd +from ..postgres_helper.postgres_helper import dump_data_to_postgres, get_postgres_connection, drop_table_in_postgres +from airflow.models import Variable + +# Retrieve the connection object using Airflow's BaseHook +connection = BaseHook.get_connection("grist_osp") +grist_api_key = connection.password +grist_server = connection.host +grist_ca_doc_id = Variable.get("grist_ca_doc_id") + +# Get api key from your Profile Settings, and run with GRIST_API_KEY= +api = GristDocAPI(grist_ca_doc_id, server=grist_server, api_key=grist_api_key) + + +def fetch_and_dump_data(connection_name): + data = api.fetch_table('Liste_clients_memoire') + df = pd.DataFrame(data) + + df['Prestations_2025'] = df['Prestations_2025'].astype(str) + + # Add boolean columns for each field + fields = ['Abo Decidim', 'Abo Grist', 'Abo Metabase', 'Bénévolat', + 'Conseil Decidim', 'Conseil Grist', 'Conseil Metabase', + 'Synthèses', 'Technique Decidim', 'Technique Metabase', 'Terminé'] + + # Ajouter des colonnes booléennes pour chaque champ + for field in fields: + df[field] = df['Prestations_2025'].apply(lambda x: field in x) + + engine = get_postgres_connection(connection_name, "aggregated_client_data") + connection = engine.connect() + table_name = "all_clients" + + drop_table_in_postgres(connection, table_name) + dump_data_to_postgres(connection, df, table_name) + + connection.close()