Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Exposing 'extra_dejson' on Connection definition #45448

Merged
merged 7 commits into from
Jan 7, 2025

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jan 7, 2025

closes: #45443

Connection model supports extra_dejson which is a deserialised value of extra as a python dictionary: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#airflow-connections-in-templates

For AIP 72 and enablement of multi language support, we should have this lie in the task sdk side, so that the API client and server cleanly only communicate in JSON strings and the responsibility of deserialising and serialising lies on the client whilst sending to task sdk.

Testing

DAG:

from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import dag


class CustomOperator(BaseOperator):
    def execute(self, context):
        import os
        task_id = context["task_instance"].task_id
        print(f"Hello World {task_id}!")
        print(context)
        print(context["conn"].airflow_db)
        print(context["conn"].airflow_db.extra_dejson)



@dag()
def super_basic_run():
    CustomOperator(task_id="hello")


super_basic_run()

Steps:

  1. In the existing connections, edited the airflow_db to add extra:
    image

  2. Added extra as:

{
  "extra-key": "value"
}
  1. Ran the DAG to see results
    image

  2. Logs:

92aebea29925
 ▶ Log message source details
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-07T04:30:47.832247","event":"Filling up the DagBag from /files/dags/conn_from_context.py","level":"info"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-07T04:30:47.832785","event":"Importing /files/dags/conn_from_context.py","level":"debug"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-07T04:30:47.835806","event":"Loaded DAG <DAG: super_basic_run>","level":"debug"}
{"file":"/files/dags/conn_from_context.py","timestamp":"2025-01-07T04:30:47.835971","logger":"task","event":"DAG file parsed","level":"debug"}
{"logger":"airflow.task.operators.unusual_prefix_0743271d82731f02d7e57e687278453c55c78854_conn_from_context.CustomOperator","timestamp":"2025-01-07T04:30:47.860640","event":"CustomOperator.execute cannot be called outside TaskInstance!","level":"warning"}
{"json":"{\"conn_id\":\"airflow_db\",\"type\":\"GetConnection\"}\n","timestamp":"2025-01-07T04:30:47.860845","logger":"task","event":"Sending request","level":"debug"}
{"chan":"stdout","event":"Hello World hello!","timestamp":"2025-01-07T04:30:47.861110Z","level":"info","logger":"task"}
{"chan":"stdout","event":"{'dag': <DAG: super_basic_run>, 'inlets': [], 'map_index_template': None, 'outlets': [], 'run_id': 'manual__2025-01-07T04:30:47.499291+00:00', 'task': <Task(CustomOperator): hello>, 'task_instance': RuntimeTaskInstance(id=UUID('01943f07-8edc-7c2b-8de6-9a0a8e9c9129'), task_id='hello', dag_id='super_basic_run', run_id='manual__2025-01-07T04:30:47.499291+00:00', try_number=1, map_index=-1, task=<Task(CustomOperator): hello>), 'ti': RuntimeTaskInstance(id=UUID('01943f07-8edc-7c2b-8de6-9a0a8e9c9129'), task_id='hello', dag_id='super_basic_run', run_id='manual__2025-01-07T04:30:47.499291+00:00', try_number=1, map_index=-1, task=<Task(CustomOperator): hello>), 'conn': <ConnectionAccessor (dynamic access)>, 'dag_run': DagRun(dag_id='super_basic_run', run_id='manual__2025-01-07T04:30:47.499291+00:00', logical_date=datetime.datetime(2025, 1, 7, 4, 30, 47, 499291, tzinfo=TzInfo(UTC)), data_interval_start=datetime.datetime(2025, 1, 7, 4, 30, 47, 499291, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2025, 1, 7, 4, 30, 47, 499291, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2025, 1, 7, 4, 30, 47, 669611, tzinfo=TzInfo(UTC)), end_date=None, run_type=<DagRunType.MANUAL: 'manual'>, conf={}), 'data_interval_end': datetime.datetime(2025, 1, 7, 4, 30, 47, 499291, tzinfo=TzInfo(UTC)), 'data_interval_start': datetime.datetime(2025, 1, 7, 4, 30, 47, 499291, tzinfo=TzInfo(UTC)), 'logical_date': datetime.datetime(2025, 1, 7, 4, 30, 47, 499291, tzinfo=TzInfo(UTC)), 'ds': '2025-01-07', 'ds_nodash': '20250107', 'task_instance_key_str': 'super_basic_run__hello__20250107', 'ts': '2025-01-07T04:30:47.499291+00:00', 'ts_nodash': '20250107T043047', 'ts_nodash_with_tz': '20250107T043047.499291+0000'}","timestamp":"2025-01-07T04:30:47.861187Z","level":"info","logger":"task"}
{"chan":"stdout","event":"Connection(conn_id='airflow_db', conn_type='mysql', description=None, host='mysql', schema=None, login='root', password=None, port=None, extra='{\"extra-key\": \"value\"}')","timestamp":"2025-01-07T04:30:47.870231Z","level":"info","logger":"task"}
{"json":"{\"conn_id\":\"airflow_db\",\"type\":\"GetConnection\"}\n","timestamp":"2025-01-07T04:30:47.870365","logger":"task","event":"Sending request","level":"debug"}
{"chan":"stdout","event":"{'extra-key': 'value'}","timestamp":"2025-01-07T04:30:47.875280Z","level":"info","logger":"task"}
{"json":"{\"state\":\"success\",\"end_date\":\"2025-01-07T04:30:47.874890Z\",\"type\":\"TaskState\"}\n","timestamp":"2025-01-07T04:30:47.874957","logger":"task","event":"Sending request","level":"debug"}

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh requested review from kaxil and ashb January 7, 2025 08:38
@amoghrajesh amoghrajesh self-assigned this Jan 7, 2025
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized I had added some part of extra_dejson yesterday night before creating that issue:

#45444

but the tests you have here are still valid, could you rebase

@amoghrajesh amoghrajesh requested a review from kaxil January 7, 2025 10:20
@amoghrajesh amoghrajesh requested a review from kaxil January 7, 2025 10:38
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 comments but lgtm otherwise

@amoghrajesh
Copy link
Contributor Author

The failure was due to a timeout. Retriggering

@amoghrajesh amoghrajesh merged commit c2ab839 into apache:main Jan 7, 2025
44 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-connections-extra-dejson branch January 7, 2025 16:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add Support for Connection.extra_dejson in Task SDK
2 participants