From 880d1e510369b79b4ac93ca05eecdc18bdcbf943 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Fri, 23 Feb 2024 10:52:16 +0530 Subject: [PATCH] Remove legacy API call to generate token (#1484) * Remove legacy API call --- ...example_external_deployment_task_sensor.py | 23 ++++--------------- .../providers/core/sensors/external_task.py | 11 +-------- 2 files changed, 6 insertions(+), 28 deletions(-) diff --git a/astronomer/providers/core/example_dags/example_external_deployment_task_sensor.py b/astronomer/providers/core/example_dags/example_external_deployment_task_sensor.py index 279ef5abe..c56450cad 100644 --- a/astronomer/providers/core/example_dags/example_external_deployment_task_sensor.py +++ b/astronomer/providers/core/example_dags/example_external_deployment_task_sensor.py @@ -2,20 +2,20 @@ from datetime import timedelta from typing import Any, Dict -import requests from airflow import DAG from airflow.hooks.base import BaseHook from airflow.operators.python import PythonOperator -from airflow.utils.log.secrets_masker import mask_secret from airflow.utils.timezone import datetime from astronomer.providers.core.sensors.external_task import ( ExternalDeploymentTaskSensorAsync, ) +# The below Airflow connection is of type http +# Add host and password param in connection +# Example host for connection is: https://cll9nj92h00iw02j51htnafw3.astronomer.run/d4be4ykx +# Example password for connection is: Astro JWT API token DEPLOYMENT_CONN_ID = os.getenv("ASTRO_DEPLOYMENT_CONN_ID", "deployment_conn_id") -ASTRONOMER_KEY_ID = os.getenv("ASTRONOMER_KEY_ID", "") -ASTRONOMER_KEY_SECRET = os.getenv("ASTRONOMER_KEY_SECRET", "") EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) DAG_ID = os.getenv("DAG_ID", "") RUN_ID = os.getenv("RUN_ID", "") @@ -31,24 +31,11 @@ def astro_access_token() -> Dict[str, Any]: """Get the Headers with access token by making post request with client_id and client_secret""" conn = BaseHook.get_connection(DEPLOYMENT_CONN_ID) - _json = { - "audience": "astronomer-ee", - "grant_type": "client_credentials", - "client_id": ASTRONOMER_KEY_ID, - "client_secret": ASTRONOMER_KEY_SECRET, - } - token_resp = requests.post( - url=conn.host, - headers={"Content-type": "application/json"}, - json=_json, - ) - masked_access_token = token_resp.json()["access_token"] - mask_secret(masked_access_token) return { "cache-control": "no-cache", "content-type": "application/json", "accept": "application/json", - "Authorization": "Bearer " + masked_access_token, + "Authorization": "Bearer " + conn.password, } diff --git a/astronomer/providers/core/sensors/external_task.py b/astronomer/providers/core/sensors/external_task.py index 984e1123f..b1b277e8b 100644 --- a/astronomer/providers/core/sensors/external_task.py +++ b/astronomer/providers/core/sensors/external_task.py @@ -4,7 +4,6 @@ import warnings from typing import TYPE_CHECKING, Any -from airflow.providers.http.hooks.http import HttpHook from airflow.providers.http.sensors.http import HttpSensor from airflow.sensors.external_task import ExternalTaskSensor from airflow.utils.session import provide_session @@ -128,14 +127,6 @@ class ExternalDeploymentTaskSensorAsync(HttpSensor): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) - self.hook = HttpHook( - method=self.method, - http_conn_id=self.http_conn_id, - tcp_keep_alive=self.tcp_keep_alive, - tcp_keep_alive_idle=self.tcp_keep_alive_idle, - tcp_keep_alive_count=self.tcp_keep_alive_count, - tcp_keep_alive_interval=self.tcp_keep_alive_interval, - ) def execute(self, context: Context) -> None: """Defers trigger class to poll for state of the job run until it reaches a failure state or success state""" @@ -143,7 +134,7 @@ def execute(self, context: Context) -> None: timeout=self.execution_timeout, trigger=ExternalDeploymentTaskTrigger( http_conn_id=self.http_conn_id, - method=self.hook.method, + method=self.method, endpoint=self.endpoint, data=self.request_params, headers=self.headers,