Skip to content

Commit

Permalink
Remove legacy API call to generate token (#1484)
Browse files Browse the repository at this point in the history
* Remove legacy API call
  • Loading branch information
pankajastro authored Feb 23, 2024
1 parent ea23093 commit 880d1e5
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
from datetime import timedelta
from typing import Any, Dict

import requests
from airflow import DAG
from airflow.hooks.base import BaseHook
from airflow.operators.python import PythonOperator
from airflow.utils.log.secrets_masker import mask_secret
from airflow.utils.timezone import datetime

from astronomer.providers.core.sensors.external_task import (
ExternalDeploymentTaskSensorAsync,
)

# The below Airflow connection is of type http
# Add host and password param in connection
# Example host for connection is: https://cll9nj92h00iw02j51htnafw3.astronomer.run/d4be4ykx
# Example password for connection is: Astro JWT API token
DEPLOYMENT_CONN_ID = os.getenv("ASTRO_DEPLOYMENT_CONN_ID", "deployment_conn_id")
ASTRONOMER_KEY_ID = os.getenv("ASTRONOMER_KEY_ID", "")
ASTRONOMER_KEY_SECRET = os.getenv("ASTRONOMER_KEY_SECRET", "")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
DAG_ID = os.getenv("DAG_ID", "")
RUN_ID = os.getenv("RUN_ID", "")
Expand All @@ -31,24 +31,11 @@
def astro_access_token() -> Dict[str, Any]:
"""Get the Headers with access token by making post request with client_id and client_secret"""
conn = BaseHook.get_connection(DEPLOYMENT_CONN_ID)
_json = {
"audience": "astronomer-ee",
"grant_type": "client_credentials",
"client_id": ASTRONOMER_KEY_ID,
"client_secret": ASTRONOMER_KEY_SECRET,
}
token_resp = requests.post(
url=conn.host,
headers={"Content-type": "application/json"},
json=_json,
)
masked_access_token = token_resp.json()["access_token"]
mask_secret(masked_access_token)
return {
"cache-control": "no-cache",
"content-type": "application/json",
"accept": "application/json",
"Authorization": "Bearer " + masked_access_token,
"Authorization": "Bearer " + conn.password,
}


Expand Down
11 changes: 1 addition & 10 deletions astronomer/providers/core/sensors/external_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import warnings
from typing import TYPE_CHECKING, Any

from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.session import provide_session
Expand Down Expand Up @@ -128,22 +127,14 @@ class ExternalDeploymentTaskSensorAsync(HttpSensor):

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.hook = HttpHook(
method=self.method,
http_conn_id=self.http_conn_id,
tcp_keep_alive=self.tcp_keep_alive,
tcp_keep_alive_idle=self.tcp_keep_alive_idle,
tcp_keep_alive_count=self.tcp_keep_alive_count,
tcp_keep_alive_interval=self.tcp_keep_alive_interval,
)

def execute(self, context: Context) -> None:
"""Defers trigger class to poll for state of the job run until it reaches a failure state or success state"""
self.defer(
timeout=self.execution_timeout,
trigger=ExternalDeploymentTaskTrigger(
http_conn_id=self.http_conn_id,
method=self.hook.method,
method=self.method,
endpoint=self.endpoint,
data=self.request_params,
headers=self.headers,
Expand Down

0 comments on commit 880d1e5

Please sign in to comment.