Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.0.4 #104

Merged
merged 35 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
739f335
first pass at operator
astro-anand Mar 26, 2024
64337c5
remove unused constants
astro-anand Mar 26, 2024
f96c355
starship dagrun migration hook
astro-anand Mar 29, 2024
1e84ca8
Migration hook methods for loading dagruns and TIs
astro-anand Apr 1, 2024
90a7111
version bump
astro-anand Apr 1, 2024
25a7088
Merge remote-tracking branch 'origin' into starship_dag_migration_ope…
astro-anand Apr 1, 2024
8c25b23
precommit fixes
astro-anand Apr 1, 2024
062d14e
tests and fixes
astro-anand Apr 10, 2024
303ed90
logging and docs
astro-anand Apr 22, 2024
c28cec3
union
astro-anand Apr 22, 2024
7d93914
add unpause option
astro-anand Apr 25, 2024
c548b20
env var button for software, and query extra info required in Setup page
fritz-astronomer May 22, 2024
d2853db
better toast for validation checkbox, only allow tabs if all data is …
fritz-astronomer May 22, 2024
192a95a
add additional error handling while fetching data, remove infinite lo…
fritz-astronomer May 23, 2024
0d13beb
Merge pull request #102 from astronomer/env
fritz-astronomer May 23, 2024
1e5197a
hide env vars. Max width on table columns to fix overflow
fritz-astronomer May 23, 2024
2c00754
add Telescope page
fritz-astronomer May 23, 2024
ae53e73
Merge pull request #103 from astronomer/telescope
fritz-astronomer May 23, 2024
d72215b
Merge branch 'dev' into starship_dag_migration_operator
fritz-astronomer May 23, 2024
f5ac3e9
re-export setDagData so it can be unit tested
fritz-astronomer May 24, 2024
c8b0f16
modify to match standard classpaths
fritz-astronomer May 24, 2024
a7fd133
big WIP DRY refactor. Used the starship_compat layer directly
fritz-astronomer May 24, 2024
ea9137b
task-group and DAG approach, manually tested
fritz-astronomer May 31, 2024
5d90ede
add docs. Doc API errors
fritz-astronomer May 31, 2024
152c937
Merge branch 'dev' into starship_dag_migration_operator
fritz-astronomer May 31, 2024
4e40ca5
remove test
fritz-astronomer May 31, 2024
559da6d
Merge branch 'starship_dag_migration_operator' of github.com:astronom…
fritz-astronomer May 31, 2024
ad571dc
Merge pull request #100 from astronomer/starship_dag_migration_operator
fritz-astronomer May 31, 2024
465f3c7
bump to 2.0.4
fritz-astronomer May 31, 2024
403fd93
swap print -> logging.info
fritz-astronomer Jun 7, 2024
69a56ee
isolate queries to constants.js
fritz-astronomer Jun 7, 2024
ce76566
remove console.log
fritz-astronomer Jun 7, 2024
66ae5b5
move around python imports
fritz-astronomer Jun 7, 2024
1da202c
update operator doc install instructions
fritz-astronomer Jun 7, 2024
61b6836
move back type hint imports
fritz-astronomer Jun 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion astronomer_starship/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.0.3"
__version__ = "2.0.4"


def get_provider_info():
Expand Down
Empty file.
Empty file.
Empty file.
253 changes: 253 additions & 0 deletions astronomer_starship/providers/starship/hooks/starship.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
"""
Hooks for interacting with Starship migrations
"""
from abc import ABC, abstractmethod

from typing import List

from airflow.providers.http.hooks.http import HttpHook
from airflow.hooks.base import BaseHook

from astronomer_starship.starship_api import starship_compat

POOLS_ROUTE = "/api/starship/pools"
CONNECTIONS_ROUTE = "/api/starship/connections"
VARIABLES_ROUTE = "/api/starship/variables"
DAGS_ROUTE = "/api/starship/dags"
DAG_RUNS_ROUTE = "/api/starship/dag_runs"
TASK_INSTANCES_ROUTE = "/api/starship/task_instances"


class StarshipHook(ABC):
@abstractmethod
def get_variables(self):
pass

@abstractmethod
def set_variable(self, **kwargs):
pass

@abstractmethod
def get_pools(self):
pass

@abstractmethod
def set_pool(self, **kwargs):
pass

@abstractmethod
def get_connections(self):
pass

@abstractmethod
def set_connection(self, **kwargs):
pass

@abstractmethod
def get_dags(self):
pass

@abstractmethod
def set_dag_is_paused(self, dag_id: str, is_paused: bool):
pass

@abstractmethod
def get_dag_runs(self, dag_id: str, offset: int = 0, limit: int = 10) -> dict:
pass

@abstractmethod
def set_dag_runs(self, dag_runs: list):
pass

@abstractmethod
def get_task_instances(self, dag_id: str, offset: int = 0, limit: int = 10):
pass

@abstractmethod
def set_task_instances(self, task_instances: list):
pass


class StarshipLocalHook(BaseHook, StarshipHook):
"""Hook to retrieve local Airflow data, which can then be sent to the Target Starship instance."""

def get_variables(self):
"""
Get all variables from the local Airflow instance.
"""
return starship_compat.get_variables()

def set_variable(self, **kwargs):
raise RuntimeError("Setting local data is not supported")

def get_pools(self):
"""
Get all pools from the local Airflow instance.
"""
return starship_compat.get_pools()

def set_pool(self, **kwargs):
raise RuntimeError("Setting local data is not supported")

# noinspection PyMethodOverriding
def get_connections(self):
"""
Get all connections from the local Airflow instance.
"""
return starship_compat.get_connections()

def set_connection(self, **kwargs):
raise RuntimeError("Setting local data is not supported")

def get_dags(self) -> dict:
"""
Get all DAGs from the local Airflow instance.
"""
return starship_compat.get_dags()

def set_dag_is_paused(self, dag_id: str, is_paused: bool):
"""
Set the paused status of a DAG in the local Airflow instance.
"""
return starship_compat.set_dag_is_paused(dag_id, is_paused)

def get_dag_runs(self, dag_id: str, offset: int = 0, limit: int = 10) -> dict:
"""
Get DAG runs from the local Airflow instance.
"""
return starship_compat.get_dag_runs(dag_id, offset=offset, limit=limit)

def set_dag_runs(self, dag_runs: list):
raise RuntimeError("Setting local data is not supported")

def get_task_instances(self, dag_id: str, offset: int = 0, limit: int = 10):
"""
Get task instances from the local Airflow instance.
"""
return starship_compat.get_task_instances(dag_id, offset=offset, limit=limit)

def set_task_instances(self, task_instances: list):
raise RuntimeError("Setting local data is not supported")


class StarshipHttpHook(HttpHook, StarshipHook):
def get_variables(self):
"""
Get all variables from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(VARIABLES_ROUTE)
res = conn.get(url)
res.raise_for_status()
return res.json()

def set_variable(self, **kwargs):
"""
Set a variable in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(VARIABLES_ROUTE)
res = conn.post(url, json=kwargs)
res.raise_for_status()
return res.json()

def get_pools(self):
"""
Get all pools from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(POOLS_ROUTE)
res = conn.get(url)
res.raise_for_status()
return res.json()

def set_pool(self, **kwargs):
"""
Set a pool in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(POOLS_ROUTE)
res = conn.post(url, json=kwargs)
res.raise_for_status()
return res.json()

# noinspection PyMethodOverriding
def get_connections(self):
"""
Get all connections from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(CONNECTIONS_ROUTE)
res = conn.get(url)
res.raise_for_status()
return res.json()

def set_connection(self, **kwargs):
"""
Set a connection in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(CONNECTIONS_ROUTE)
res = conn.post(url, json=kwargs)
res.raise_for_status()
return res.json()

def get_dags(self) -> dict:
"""
Get all DAGs from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(DAGS_ROUTE)
res = conn.get(url)
res.raise_for_status()
return res.json()

def set_dag_is_paused(self, dag_id: str, is_paused: bool):
"""
Set the paused status of a DAG in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(DAGS_ROUTE)
res = conn.patch(url, json={"dag_id": dag_id, "is_paused": is_paused})
res.raise_for_status()
return res.json()

def get_dag_runs(self, dag_id: str, offset: int = 0, limit: int = 10) -> dict:
"""
Get DAG runs from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(DAG_RUNS_ROUTE)
res = conn.get(url, params={"dag_id": dag_id, "limit": limit})
res.raise_for_status()
return res.json()

def set_dag_runs(self, dag_runs: List[dict]) -> dict:
"""
Set DAG runs in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(DAG_RUNS_ROUTE)
res = conn.post(url, json={"dag_runs": dag_runs})
res.raise_for_status()
return res.json()

def get_task_instances(self, dag_id: str, offset: int = 0, limit: int = 10):
"""
Get task instances from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(TASK_INSTANCES_ROUTE)
res = conn.get(url, params={"dag_id": dag_id, "limit": limit})
res.raise_for_status()
return res.json()

def set_task_instances(self, task_instances: list[dict]) -> dict:
"""
Set task instances in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(TASK_INSTANCES_ROUTE)
res = conn.post(url, json={"task_instances": task_instances})
res.raise_for_status()
return res.json()
Empty file.
Loading
Loading