diff --git a/astronomer_starship/__init__.py b/astronomer_starship/__init__.py index 1abd8a1..ea2396a 100644 --- a/astronomer_starship/__init__.py +++ b/astronomer_starship/__init__.py @@ -1,4 +1,4 @@ -__version__ = "2.0.3" +__version__ = "2.0.4" def get_provider_info(): diff --git a/astronomer_starship/providers/__init__.py b/astronomer_starship/providers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/astronomer_starship/providers/starship/__init__.py b/astronomer_starship/providers/starship/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/astronomer_starship/providers/starship/hooks/__init__.py b/astronomer_starship/providers/starship/hooks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/astronomer_starship/providers/starship/hooks/starship.py b/astronomer_starship/providers/starship/hooks/starship.py new file mode 100644 index 0000000..bccb208 --- /dev/null +++ b/astronomer_starship/providers/starship/hooks/starship.py @@ -0,0 +1,253 @@ +""" +Hooks for interacting with Starship migrations +""" +from abc import ABC, abstractmethod + +from typing import List + +from airflow.providers.http.hooks.http import HttpHook +from airflow.hooks.base import BaseHook + +from astronomer_starship.starship_api import starship_compat + +POOLS_ROUTE = "/api/starship/pools" +CONNECTIONS_ROUTE = "/api/starship/connections" +VARIABLES_ROUTE = "/api/starship/variables" +DAGS_ROUTE = "/api/starship/dags" +DAG_RUNS_ROUTE = "/api/starship/dag_runs" +TASK_INSTANCES_ROUTE = "/api/starship/task_instances" + + +class StarshipHook(ABC): + @abstractmethod + def get_variables(self): + pass + + @abstractmethod + def set_variable(self, **kwargs): + pass + + @abstractmethod + def get_pools(self): + pass + + @abstractmethod + def set_pool(self, **kwargs): + pass + + @abstractmethod + def get_connections(self): + pass + + @abstractmethod + def set_connection(self, **kwargs): + pass + + @abstractmethod + def get_dags(self): + pass + + @abstractmethod + def set_dag_is_paused(self, dag_id: str, is_paused: bool): + pass + + @abstractmethod + def get_dag_runs(self, dag_id: str, offset: int = 0, limit: int = 10) -> dict: + pass + + @abstractmethod + def set_dag_runs(self, dag_runs: list): + pass + + @abstractmethod + def get_task_instances(self, dag_id: str, offset: int = 0, limit: int = 10): + pass + + @abstractmethod + def set_task_instances(self, task_instances: list): + pass + + +class StarshipLocalHook(BaseHook, StarshipHook): + """Hook to retrieve local Airflow data, which can then be sent to the Target Starship instance.""" + + def get_variables(self): + """ + Get all variables from the local Airflow instance. + """ + return starship_compat.get_variables() + + def set_variable(self, **kwargs): + raise RuntimeError("Setting local data is not supported") + + def get_pools(self): + """ + Get all pools from the local Airflow instance. + """ + return starship_compat.get_pools() + + def set_pool(self, **kwargs): + raise RuntimeError("Setting local data is not supported") + + # noinspection PyMethodOverriding + def get_connections(self): + """ + Get all connections from the local Airflow instance. + """ + return starship_compat.get_connections() + + def set_connection(self, **kwargs): + raise RuntimeError("Setting local data is not supported") + + def get_dags(self) -> dict: + """ + Get all DAGs from the local Airflow instance. + """ + return starship_compat.get_dags() + + def set_dag_is_paused(self, dag_id: str, is_paused: bool): + """ + Set the paused status of a DAG in the local Airflow instance. + """ + return starship_compat.set_dag_is_paused(dag_id, is_paused) + + def get_dag_runs(self, dag_id: str, offset: int = 0, limit: int = 10) -> dict: + """ + Get DAG runs from the local Airflow instance. + """ + return starship_compat.get_dag_runs(dag_id, offset=offset, limit=limit) + + def set_dag_runs(self, dag_runs: list): + raise RuntimeError("Setting local data is not supported") + + def get_task_instances(self, dag_id: str, offset: int = 0, limit: int = 10): + """ + Get task instances from the local Airflow instance. + """ + return starship_compat.get_task_instances(dag_id, offset=offset, limit=limit) + + def set_task_instances(self, task_instances: list): + raise RuntimeError("Setting local data is not supported") + + +class StarshipHttpHook(HttpHook, StarshipHook): + def get_variables(self): + """ + Get all variables from the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(VARIABLES_ROUTE) + res = conn.get(url) + res.raise_for_status() + return res.json() + + def set_variable(self, **kwargs): + """ + Set a variable in the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(VARIABLES_ROUTE) + res = conn.post(url, json=kwargs) + res.raise_for_status() + return res.json() + + def get_pools(self): + """ + Get all pools from the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(POOLS_ROUTE) + res = conn.get(url) + res.raise_for_status() + return res.json() + + def set_pool(self, **kwargs): + """ + Set a pool in the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(POOLS_ROUTE) + res = conn.post(url, json=kwargs) + res.raise_for_status() + return res.json() + + # noinspection PyMethodOverriding + def get_connections(self): + """ + Get all connections from the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(CONNECTIONS_ROUTE) + res = conn.get(url) + res.raise_for_status() + return res.json() + + def set_connection(self, **kwargs): + """ + Set a connection in the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(CONNECTIONS_ROUTE) + res = conn.post(url, json=kwargs) + res.raise_for_status() + return res.json() + + def get_dags(self) -> dict: + """ + Get all DAGs from the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(DAGS_ROUTE) + res = conn.get(url) + res.raise_for_status() + return res.json() + + def set_dag_is_paused(self, dag_id: str, is_paused: bool): + """ + Set the paused status of a DAG in the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(DAGS_ROUTE) + res = conn.patch(url, json={"dag_id": dag_id, "is_paused": is_paused}) + res.raise_for_status() + return res.json() + + def get_dag_runs(self, dag_id: str, offset: int = 0, limit: int = 10) -> dict: + """ + Get DAG runs from the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(DAG_RUNS_ROUTE) + res = conn.get(url, params={"dag_id": dag_id, "limit": limit}) + res.raise_for_status() + return res.json() + + def set_dag_runs(self, dag_runs: List[dict]) -> dict: + """ + Set DAG runs in the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(DAG_RUNS_ROUTE) + res = conn.post(url, json={"dag_runs": dag_runs}) + res.raise_for_status() + return res.json() + + def get_task_instances(self, dag_id: str, offset: int = 0, limit: int = 10): + """ + Get task instances from the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(TASK_INSTANCES_ROUTE) + res = conn.get(url, params={"dag_id": dag_id, "limit": limit}) + res.raise_for_status() + return res.json() + + def set_task_instances(self, task_instances: list[dict]) -> dict: + """ + Set task instances in the Target Starship instance. + """ + conn = self.get_conn() + url = self.url_from_endpoint(TASK_INSTANCES_ROUTE) + res = conn.post(url, json={"task_instances": task_instances}) + res.raise_for_status() + return res.json() diff --git a/astronomer_starship/providers/starship/operators/__init__.py b/astronomer_starship/providers/starship/operators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/astronomer_starship/providers/starship/operators/starship.py b/astronomer_starship/providers/starship/operators/starship.py new file mode 100644 index 0000000..629a88f --- /dev/null +++ b/astronomer_starship/providers/starship/operators/starship.py @@ -0,0 +1,350 @@ +"""Operators, TaskGroups, and DAGs for interacting with the Starship migrations.""" +import logging +from datetime import datetime +from typing import Any, Union, List + +import airflow +from airflow import DAG +from airflow.decorators import task +from airflow.exceptions import AirflowSkipException +from airflow.models.baseoperator import BaseOperator +from airflow.utils.context import Context +from airflow.utils.task_group import TaskGroup + +from astronomer_starship.providers.starship.hooks.starship import ( + StarshipLocalHook, + StarshipHttpHook, +) + +# Compatability Notes: +# - @task() is >=AF2.0 +# - @task_group is >=AF2.1 +# - Dynamic Task Mapping is >=AF2.3 +# - Dynamic Task Mapping labelling is >=AF2.9 + + +class StarshipMigrationOperator(BaseOperator): + def __init__(self, http_conn_id=None, **kwargs): + super().__init__(**kwargs) + self.source_hook = StarshipLocalHook() + self.target_hook = StarshipHttpHook(http_conn_id=http_conn_id) + + +class StarshipVariableMigrationOperator(StarshipMigrationOperator): + """Operator to migrate a single Variable from one Airflow instance to another.""" + + def __init__(self, variable_key: Union[str, None] = None, **kwargs): + super().__init__(**kwargs) + self.variable_key = variable_key + + def execute(self, context: Context) -> Any: + logging.info("Getting Variable", self.variable_key) + variables = self.source_hook.get_variables() + variable: Union[dict, None] = ( + [v for v in variables if v["key"] == self.variable_key] or [None] + )[0] + if variable is not None: + logging.info("Migrating Variable", self.variable_key) + self.target_hook.set_variable(**variable) + else: + raise RuntimeError("Variable not found! " + self.variable_key) + + +def starship_variables_migration(variables: List[str] = None, **kwargs): + """TaskGroup to fetch and migrate Variables from one Airflow instance to another.""" + with TaskGroup("variables") as tg: + + @task() + def get_variables(): + _variables = StarshipLocalHook().get_variables() + + _variables = ( + [k["key"] for k in _variables if k["key"] in variables] + if variables is not None + else [k["key"] for k in _variables] + ) + + if not len(_variables): + raise AirflowSkipException("Nothing to migrate") + return _variables + + variables_results = get_variables() + if airflow.__version__ >= "2.3.0": + StarshipVariableMigrationOperator.partial( + task_id="migrate_variables", **kwargs + ).expand(variable_key=variables_results) + else: + for variable in variables_results.output: + variables_results >> StarshipVariableMigrationOperator( + task_id="migrate_variable_" + variable, + variable_key=variable, + **kwargs, + ) + return tg + + +class StarshipPoolMigrationOperator(StarshipMigrationOperator): + """Operator to migrate a single Pool from one Airflow instance to another.""" + + def __init__(self, pool_name: Union[str, None] = None, **kwargs): + super().__init__(**kwargs) + self.pool_name = pool_name + + def execute(self, context: Context) -> Any: + logging.info("Getting Pool", self.pool_name) + pool: Union[dict, None] = ( + [v for v in self.source_hook.get_pools() if v["name"] == self.pool_name] + or [None] + )[0] + if pool is not None: + logging.info("Migrating Pool", self.pool_name) + self.target_hook.set_pool(**pool) + else: + raise RuntimeError("Pool not found!") + + +def starship_pools_migration(pools: List[str] = None, **kwargs): + """TaskGroup to fetch and migrate Pools from one Airflow instance to another.""" + with TaskGroup("pools") as tg: + + @task() + def get_pools(): + _pools = StarshipLocalHook().get_pools() + _pools = ( + [k["name"] for k in _pools if k["name"] in pools] + if pools is not None + else [k["name"] for k in _pools] + ) + + if not len(_pools): + raise AirflowSkipException("Nothing to migrate") + return _pools + + pools_result = get_pools() + if airflow.__version__ >= "2.3.0": + StarshipPoolMigrationOperator.partial( + task_id="migrate_pools", **kwargs + ).expand(pool_name=pools_result) + else: + for pool in pools_result.output: + pools_result >> StarshipPoolMigrationOperator( + task_id="migrate_pool_" + pool, pool_name=pool, **kwargs + ) + return tg + + +class StarshipConnectionMigrationOperator(StarshipMigrationOperator): + """Operator to migrate a single Connection from one Airflow instance to another.""" + + def __init__(self, connection_id: Union[str, None] = None, **kwargs): + super().__init__(**kwargs) + self.connection_id = connection_id + + def execute(self, context: Context) -> Any: + logging.info("Getting Connection", self.connection_id) + connection: Union[dict, None] = ( + [ + v + for v in self.source_hook.get_connections() + if v["conn_id"] == self.connection_id + ] + or [None] + )[0] + if connection is not None: + logging.info("Migrating Connection", self.connection_id) + self.target_hook.set_connection(**connection) + else: + raise RuntimeError("Connection not found!") + + +def starship_connections_migration(connections: List[str] = None, **kwargs): + """TaskGroup to fetch and migrate Connections from one Airflow instance to another.""" + with TaskGroup("connections") as tg: + + @task() + def get_connections(): + _connections = StarshipLocalHook().get_connections() + _connections = ( + [k["conn_id"] for k in _connections if k["conn_id"] in connections] + if connections is not None + else [k["conn_id"] for k in _connections] + ) + + if not len(_connections): + raise AirflowSkipException("Nothing to migrate") + return _connections + + connections_result = get_connections() + if airflow.__version__ >= "2.3.0": + StarshipConnectionMigrationOperator.partial( + task_id="migrate_connections", **kwargs + ).expand(connection_id=connections_result) + else: + for connection in connections_result.output: + connections_result >> StarshipConnectionMigrationOperator( + task_id="migrate_connection_" + connection.conn_id, + connection_id=connection, + **kwargs, + ) + return tg + + +class StarshipDagHistoryMigrationOperator(StarshipMigrationOperator): + """Operator to migrate a single DAG from one Airflow instance to another, with it's history.""" + + def __init__( + self, + target_dag_id: str, + unpause_dag_in_target: bool = False, + dag_run_limit: int = 10, + **kwargs, + ): + super().__init__(**kwargs) + self.target_dag_id = target_dag_id + self.unpause_dag_in_target = unpause_dag_in_target + self.dag_run_limit = dag_run_limit + + def execute(self, context): + logging.info("Pausing local DAG for", self.target_dag_id) + self.source_hook.set_dag_is_paused(dag_id=self.target_dag_id, is_paused=True) + # TODO - Poll until all tasks are done + + logging.info("Getting local DAG Runs for", self.target_dag_id) + dag_runs = self.source_hook.get_dag_runs( + dag_id=self.target_dag_id, limit=self.dag_run_limit + ) + if len(dag_runs["dag_runs"]) == 0: + raise AirflowSkipException("No DAG Runs found for " + self.target_dag_id) + + logging.info("Getting local Task Instances for", self.target_dag_id) + task_instances = self.source_hook.get_task_instances( + dag_id=self.target_dag_id, limit=self.dag_run_limit + ) + if len(task_instances["task_instances"]) == 0: + raise AirflowSkipException( + "No Task Instances found for " + self.target_dag_id + ) + + logging.info("Setting target DAG Runs for", self.target_dag_id) + self.target_hook.set_dag_runs(dag_runs=dag_runs["dag_runs"]) + + logging.info("Setting target Task Instances for", self.target_dag_id) + self.target_hook.set_task_instances( + task_instances=task_instances["task_instances"] + ) + + if self.unpause_dag_in_target: + logging.info("Unpausing target DAG for", self.target_dag_id) + self.target_hook.set_dag_is_paused( + dag_id=self.target_dag_id, is_paused=False + ) + + +def starship_dag_history_migration(dag_ids: List[str] = None, **kwargs): + """TaskGroup to fetch and migrate DAGs with their history from one Airflow instance to another.""" + with TaskGroup("dag_history") as tg: + + @task() + def get_dags(): + _dags = StarshipLocalHook().get_dags() + _dags = ( + [ + k["dag_id"] + for k in _dags + if k["dag_id"] in dag_ids + and k["dag_id"] != "StarshipAirflowMigrationDAG" + ] + if dag_ids is not None + else [ + k["dag_id"] + for k in _dags + if k["dag_id"] != "StarshipAirflowMigrationDAG" + ] + ) + + if not len(_dags): + raise AirflowSkipException("Nothing to migrate") + return _dags + + dags_result = get_dags() + if airflow.__version__ >= "2.3.0": + StarshipDagHistoryMigrationOperator.partial( + task_id="migrate_dag_ids", + **( + {"map_index_template": "{{ task.target_dag_id }}"} + if airflow.__version__ >= "2.9.0" + else {} + ), + **kwargs, + ).expand(target_dag_id=dags_result) + else: + for dag_id in dags_result.output: + dags_result >> StarshipDagHistoryMigrationOperator( + task_id="migrate_dag_" + dag_id, target_dag_id=dag_id, **kwargs + ) + return tg + + +# noinspection PyPep8Naming +def StarshipAirflowMigrationDAG( + http_conn_id: str, + variables: List[str] = None, + pools: List[str] = None, + connections: List[str] = None, + dag_ids: List[str] = None, + **kwargs, +): + """ + DAG to fetch and migrate Variables, Pools, Connections, and DAGs with history from one Airflow instance to another. + """ + dag = DAG( + dag_id="starship_airflow_migration_dag", + schedule="@once", + start_date=datetime(1970, 1, 1), + tags=["migration", "starship"], + default_args={"owner": "Astronomer"}, + doc_md=""" + # Starship Migration DAG + A DAG to migrate Airflow Variables, Pools, Connections, and DAG History from one Airflow instance to another. + + You can use this DAG to migrate all items, or specific items by providing a list of names. + + You can skip migration by providing an empty list. + + ## Setup: + Make a connection in Airflow with the following details: + - **Conn ID**: `starship_default` + - **Conn Type**: `HTTP` + - **Host**: the URL of the homepage of Airflow (excluding `/home` on the end of the URL) + - For example, if your deployment URL is `https://astronomer.astronomer.run/abcdt4ry/home`, you'll use `https://astronomer.astronomer.run/abcdt4ry` + - **Schema**: `https` + - **Extras**: `{"Authorization": "Bearer "}` + + ## Usage: + ```python + from astronomer_starship.providers.starship.operators.starship import ( + StarshipAirflowMigrationDAG, + ) + + globals()["starship_airflow_migration_dag"] = StarshipAirflowMigrationDAG( + http_conn_id="starship_default", + variables=None, # None to migrate all, or ["var1", "var2"] to migrate specific items, or empty list to skip all + pools=None, # None to migrate all, or ["pool1", "pool2"] to migrate specific items, or empty list to skip all + connections=None, # None to migrate all, or ["conn1", "conn2"] to migrate specific items, or empty list to skip all + dag_ids=None, # None to migrate all, or ["dag1", "dag2"] to migrate specific items, or empty list to skip all + ) + ``` + """, # noqa: E501 + ) + with dag: + starship_variables_migration( + variables=variables, http_conn_id=http_conn_id, **kwargs + ) + starship_pools_migration(pools=pools, http_conn_id=http_conn_id, **kwargs) + starship_connections_migration( + connections=connections, http_conn_id=http_conn_id, **kwargs + ) + starship_dag_history_migration( + dag_ids=dag_ids, http_conn_id=http_conn_id, **kwargs + ) + return dag diff --git a/astronomer_starship/src/App.jsx b/astronomer_starship/src/App.jsx index 6ae7a51..580e644 100644 --- a/astronomer_starship/src/App.jsx +++ b/astronomer_starship/src/App.jsx @@ -19,6 +19,7 @@ import { } from './State'; import './index.css'; import AppLoading from './component/AppLoading'; +import TelescopePage from './pages/TelescopePage'; export default function App() { const [state, dispatch] = useReducer(reducer, initialState, getInitialState); @@ -118,7 +119,8 @@ export default function App() { } /> } /> } /> - , + } /> + , ), ); return ( diff --git a/astronomer_starship/src/State.jsx b/astronomer_starship/src/State.jsx index 7e5f0de..5a5014e 100644 --- a/astronomer_starship/src/State.jsx +++ b/astronomer_starship/src/State.jsx @@ -31,6 +31,14 @@ export const initialState = { isProductSelected: false, isTokenTouched: false, token: null, + deploymentId: null, + telescopeOrganizationId: '', + telescopePresignedUrl: '', + + // Software Specific: + releaseName: null, + workspaceId: null, + // ### VARIABLES PAGE #### variablesLocalData: [], variablesRemoteData: [], @@ -51,6 +59,7 @@ export const initialState = { envRemoteData: [], envLoading: false, envError: null, + organizationId: null, // ### DAGS PAGE #### dagsData: {}, dagsLoading: false, @@ -79,7 +88,7 @@ export const reducer = (state, action) => { urlDeploymentPart: action.urlDeploymentPart, urlOrgPart: action.urlOrgPart, isValidUrl: action.urlOrgPart && action.urlDeploymentPart, - isSetupComplete: action.urlOrgPart && action.urlDeploymentPart && state.token, + isSetupComplete: state.isStarship && state.isAirflow && state.token && action.urlOrgPart && action.urlDeploymentPart, }; } case 'set-token': { @@ -87,7 +96,7 @@ export const reducer = (state, action) => { ...state, isTokenTouched: true, token: action.token, - isSetupComplete: action.token && state.isValidUrl, + isSetupComplete: state.isStarship && state.isAirflow && action.token && state.isValidUrl, }; } case 'toggle-is-astro': { @@ -96,6 +105,7 @@ export const reducer = (state, action) => { isAstro: !state.isAstro, isProductSelected: true, targetUrl: getTargetUrlFromParts(state.urlOrgPart, state.urlDeploymentPart, !state.isAstro), + token: null, isSetupComplete: false, }; } @@ -103,10 +113,40 @@ export const reducer = (state, action) => { return { ...state, isProductSelected: true }; } case 'set-is-starship': { - return { ...state, isStarship: action.isStarship }; + return { + ...state, + isStarship: action.isStarship, + isSetupComplete: action.isStarship && state.isAirflow && state.token && state.isValidUrl, + }; } case 'set-is-airflow': { - return { ...state, isAirflow: action.isAirflow }; + return { + ...state, + isAirflow: action.isAirflow, + isSetupComplete: action.isAirflow && state.isStarship && state.token && state.isValidUrl, + }; + } + case 'set-software-info': { + return { + ...state, + releaseName: action.releaseName, + workspaceId: action.workspaceId, + deploymentId: action.deploymentId, + }; + } + + // ### Telescope ### + case 'set-telescope-org': { + return { + ...state, + telescopeOrganizationId: action.telescopeOrganizationId, + }; + } + case 'set-telescope-presigned-url': { + return { + ...state, + telescopePresignedUrl: action.telescopePresignedUrl, + }; } // ### VARIABLES PAGE #### @@ -131,6 +171,7 @@ export const reducer = (state, action) => { return action.error.response.status === 401 ? { ...state, variablesError: action.error, + variablesLoading: false, isSetupComplete: false, isTokenTouched: false, token: null, @@ -159,6 +200,7 @@ export const reducer = (state, action) => { return action.error.response.status === 401 ? { ...state, connectionsError: action.error, + connectionsLoading: false, isSetupComplete: false, isTokenTouched: false, token: null, @@ -187,6 +229,7 @@ export const reducer = (state, action) => { return action.error.response.status === 401 ? { ...state, poolsError: action.error, + poolsLoading: false, isSetupComplete: false, isTokenTouched: false, token: null, @@ -208,6 +251,8 @@ export const reducer = (state, action) => { ...state, envLocalData: action.envLocalData, envRemoteData: action.envRemoteData, + organizationId: action.envRemoteData['ASTRO_ORGANIZATION_ID'] || state.organizationId, + deploymentId: action.envRemoteData['ASTRO_DEPLOYMENT_ID'] || state.deploymentId, envLoading: false, }; } @@ -215,6 +260,7 @@ export const reducer = (state, action) => { return action.error.response.status === 401 ? { ...state, envError: action.error, + envLoading: false, isSetupComplete: false, isTokenTouched: false, token: null, @@ -241,6 +287,7 @@ export const reducer = (state, action) => { return action.error.response.status === 401 ? { ...state, dagsError: action.error, + dagsLoading: false, isSetupComplete: false, isTokenTouched: false, token: null, diff --git a/astronomer_starship/src/component/DataTable.jsx b/astronomer_starship/src/component/DataTable.jsx index 4ea72af..76051ac 100644 --- a/astronomer_starship/src/component/DataTable.jsx +++ b/astronomer_starship/src/component/DataTable.jsx @@ -1,6 +1,6 @@ import * as React from 'react'; import { - Table, Thead, Tbody, Tr, Th, Td, chakra, + Table, Thead, Tbody, Tr, Th, Td, chakra, TableContainer, } from '@chakra-ui/react'; import { TriangleDownIcon, TriangleUpIcon } from '@chakra-ui/icons'; import { @@ -21,58 +21,60 @@ export default function DataTable({ getCoreRowModel: getCoreRowModel(), onSortingChange: setSorting, getSortedRowModel: getSortedRowModel(), - state: { sorting }, + state: {sorting}, }); return ( - - - {table.getHeaderGroups().map((headerGroup) => ( - - {headerGroup.headers.map((header) => { - // see https://tanstack.com/table/v8/docs/api/core/column-def#meta to type this correctly - const { meta } = header.column.columnDef; - return ( - + ); + })} + + ))} + + + {table.getRowModel().rows.map((row) => ( + + {row.getVisibleCells().map((cell) => { + // see https://tanstack.com/table/v8/docs/api/core/column-def#meta to type this correctly + const {meta} = cell.column.columnDef; + return ( + + ); + })} + + ))} + +
- {flexRender( - header.column.columnDef.header, - header.getContext(), - )} + + + + {table.getHeaderGroups().map((headerGroup) => ( + + {headerGroup.headers.map((header) => { + // see https://tanstack.com/table/v8/docs/api/core/column-def#meta to type this correctly + const {meta} = header.column.columnDef; + return ( + - ); - })} - - ))} - - - {table.getRowModel().rows.map((row) => ( - - {row.getVisibleCells().map((cell) => { - // see https://tanstack.com/table/v8/docs/api/core/column-def#meta to type this correctly - const { meta } = cell.column.columnDef; - return ( - - ); - })} - - ))} - -
+ {flexRender( + header.column.columnDef.header, + header.getContext(), + )} - - {header.column.getIsSorted() ? ( - header.column.getIsSorted() === 'desc' ? ( - - ) : ( - - ) - ) : null} - -
- {flexRender(cell.column.columnDef.cell, cell.getContext())} -
+ + {header.column.getIsSorted() ? ( + header.column.getIsSorted() === 'desc' ? ( + + ) : ( + + ) + ) : null} + +
+ {flexRender(cell.column.columnDef.cell, cell.getContext())} +
+ ); } diff --git a/astronomer_starship/src/component/ValidatedUrlCheckbox.jsx b/astronomer_starship/src/component/ValidatedUrlCheckbox.jsx index 3fde2e4..3e002a0 100644 --- a/astronomer_starship/src/component/ValidatedUrlCheckbox.jsx +++ b/astronomer_starship/src/component/ValidatedUrlCheckbox.jsx @@ -12,13 +12,29 @@ export default function ValidatedUrlCheckbox({ useEffect(() => { // noinspection JSCheckFunctionSignatures axios.get(proxyUrl(url), { headers: proxyHeaders(token) }) - .then((res) => setValid(res.status === 200)) + .then((res) => { + // Valid if it's a 200, has data, and is JSON + const isValid = ( + res.status === 200 && + res.data && + (res.headers['content-type'] === 'application/json' || res.data === "OK") + ); + setValid(isValid); + }) .catch((err) => { - toast({ - title: err.response?.data?.error || err.response?.data || err.message, - status: 'error', - isClosable: true, - }); + if (err.response.status === 404) { + toast({ + title: 'Not found', + status: 'error', + isClosable: true, + }); + } else { + toast({ + title: err.response?.data?.error || err.message || err.response?.data, + status: 'error', + isClosable: true, + }); + } setValid(false); }) .finally(() => setLoading.off()); diff --git a/astronomer_starship/src/constants.js b/astronomer_starship/src/constants.js index 68746f1..1a4a619 100644 --- a/astronomer_starship/src/constants.js +++ b/astronomer_starship/src/constants.js @@ -1,4 +1,5 @@ const constants = { + TELESCOPE_ROUTE: '/api/starship/telescope', ENV_VAR_ROUTE: '/api/starship/env_vars', POOL_ROUTE: '/api/starship/pools', CONNECTIONS_ROUTE: '/api/starship/connections', @@ -8,3 +9,42 @@ const constants = { TASK_INSTANCE_ROUTE: '/api/starship/task_instances', }; export default constants; + +export const updateDeploymentVariablesMutation = ` +mutation UpdateDeploymentVariables( + $deploymentUuid:Uuid!, + $releaseName:String!, + $environmentVariables: [InputEnvironmentVariable!]! +) { + updateDeploymentVariables( + deploymentUuid: $deploymentUuid, + releaseName: $releaseName, + environmentVariables: $environmentVariables + ) { + key + value + isSecret + } +}`; + +export const getDeploymentsQuery = `query deploymentVariables($deploymentUuid: Uuid!, $releaseName: String!) { + deploymentVariables( + deploymentUuid: $deploymentUuid + releaseName: $releaseName + ) { + key + value + isSecret + } +}`; + +export const getWorkspaceDeploymentsQuery = ` +query workspaces { + workspaces { + id + deployments { + id + releaseName + } + } +}`; diff --git a/astronomer_starship/src/pages/EnvVarsPage.jsx b/astronomer_starship/src/pages/EnvVarsPage.jsx index a152a23..c10788e 100644 --- a/astronomer_starship/src/pages/EnvVarsPage.jsx +++ b/astronomer_starship/src/pages/EnvVarsPage.jsx @@ -1,9 +1,6 @@ -/* eslint-disable no-nested-ternary */ import React, { useEffect, useState } from 'react'; import { createColumnHelper } from '@tanstack/react-table'; -import { - Text, Button, useToast, Tooltip, HStack, Spacer, -} from '@chakra-ui/react'; +import { Button, HStack, Spacer, Text, useToast, } from '@chakra-ui/react'; import PropTypes from 'prop-types'; import axios from 'axios'; import { MdErrorOutline } from 'react-icons/md'; @@ -11,75 +8,134 @@ import { FaCheck } from 'react-icons/fa'; import { GoUpload } from 'react-icons/go'; import { RepeatIcon } from '@chakra-ui/icons'; import StarshipPage from '../component/StarshipPage'; -import MigrateButton from '../component/MigrateButton'; import { - fetchData, localRoute, proxyHeaders, proxyUrl, remoteRoute, + fetchData, + getAstroEnvVarRoute, + getHoustonRoute, + localRoute, + proxyHeaders, + proxyUrl, + remoteRoute, } from '../util'; -import constants from '../constants'; +import constants, { getDeploymentsQuery, updateDeploymentVariablesMutation } from '../constants'; +import HiddenValue from "../component/HiddenValue.jsx"; + -// noinspection JSUnusedLocalSymbols -export function MigrateEnvButton({ - // eslint-disable-next-line no-unused-vars,react/prop-types - isAstro, route, headers, existsInRemote, sendData, +function EnvVarMigrateButton({ + route, headers, existsInRemote, sendData, isAstro, deploymentId, releaseName }) { const [loading, setLoading] = useState(false); const [error, setError] = useState(null); const toast = useToast(); const [exists, setExists] = useState(existsInRemote); - function handleClick() { + const errFn = (err) => { + setExists(false); + setLoading(false); + toast({ + title: err.response?.data?.error || err.response?.data || err.message, + status: 'error', + isClosable: true, + }); + setError(err); + } + + function handleSoftwareClick() { + // POST https://houston.BASEDOMAIN/v1 setLoading(true); - axios.post(route, sendData, { headers }) + axios.post( + route, + { + operationName: "deploymentVariables", + query: getDeploymentsQuery, + variables: { + "deploymentUuid": deploymentId, + "releaseName": releaseName, + } + }, + { headers } + ) .then((res) => { - setLoading(false); - setExists(res.status === 200); + let variables = res.data?.data?.deploymentVariables || []; + // TODO - DEDUPE? Check if key already exists and reject + variables.push(sendData); + axios.post( + route, + { + operationName: "UpdateDeploymentVariables", + query: updateDeploymentVariablesMutation, + variables: { + "deploymentUuid": deploymentId, + "releaseName": releaseName, + "environmentVariables": variables, + } + }, + { headers } + ) + .then((res) => { + setLoading(false); + setExists(res.status === 200); + }) + .catch(errFn); }) - .catch((err) => { - setExists(false); - setLoading(false); - toast({ - title: err.response?.data?.error || err.response?.data || err.message, - status: 'error', - isClosable: true, - }); - setError(err); - }); + .catch(errFn); } + function handleAstroClick() { + setLoading(true); + // GET/POST https://api.astronomer.io/platform/v1beta1/organizations/:organizationId/deployments/:deploymentId + axios.get(route, { headers }) + .then((res) => { + // TODO - DEDUPE? Check if key already exists and reject + res.data?.environmentVariables.push(sendData); + axios.post(route, res.data, { headers }) + .then((res) => { + setLoading(false); + setExists(res.status === 200); + }) + .catch(errFn); + }) + .catch(errFn); + } return ( - - - + onClick={() => isAstro ? handleAstroClick() : handleSoftwareClick()} + > + {exists ? 'Ok' : loading ? '' : error ? 'Error!' : 'Migrate'} + ); } -MigrateEnvButton.propTypes = { +EnvVarMigrateButton.propTypes = { route: PropTypes.string.isRequired, headers: PropTypes.objectOf(PropTypes.string), existsInRemote: PropTypes.bool, // eslint-disable-next-line react/forbid-prop-types sendData: PropTypes.object.isRequired, + deploymentId: PropTypes.string, + releaseName: PropTypes.string, }; -MigrateEnvButton.defaultProps = { +EnvVarMigrateButton.defaultProps = { headers: {}, existsInRemote: false, + deploymentId: null, + releaseName: null, }; const columnHelper = createColumnHelper(); +const valueColumn = columnHelper.accessor('value', { + id: 'value', cell: (props) => , +}); function setEnvData(localData, remoteData) { return Object.entries(localData).map( @@ -104,30 +160,38 @@ export default function EnvVarsPage({ state, dispatch }) { ); useEffect(() => fetchPageData(), []); useEffect( - () => setData(setEnvData(state.envLocalData, state.envRemoteData)), + () => { + setData(setEnvData(state.envLocalData, state.envRemoteData)) + }, [state], ); + // // noinspection JSCheckFunctionSignatures const columns = [ columnHelper.accessor('key'), - columnHelper.accessor('value'), + valueColumn, columnHelper.display({ id: 'migrate', header: 'Migrate', // eslint-disable-next-line react/no-unstable-nested-components cell: (info) => ( - + deploymentId={state.deploymentId} + releaseName={state.releaseName} + /> ), }), ]; diff --git a/astronomer_starship/src/pages/SetupPage.jsx b/astronomer_starship/src/pages/SetupPage.jsx index dbeed73..dfab486 100644 --- a/astronomer_starship/src/pages/SetupPage.jsx +++ b/astronomer_starship/src/pages/SetupPage.jsx @@ -1,38 +1,92 @@ import { Box, + Button, Divider, - VStack, - Text, - InputGroup, - Input, - InputRightAddon, - InputLeftAddon, - FormLabel, + Fade, FormControl, - Switch, + FormErrorMessage, + FormHelperText, + FormLabel, HStack, + Input, + InputGroup, + InputLeftAddon, + InputRightAddon, + InputRightElement, Link, SlideFade, - Button, - Fade, - FormErrorMessage, - FormHelperText, - InputRightElement, useColorMode, Spacer, + Spacer, + Switch, + Text, + VStack, } from '@chakra-ui/react'; -import React from 'react'; +import React, { useEffect } from 'react'; import PropTypes from 'prop-types'; -import { - CheckIcon, ExternalLinkIcon, RepeatIcon, -} from '@chakra-ui/icons'; -import { getTargetUrlFromParts, tokenUrlFromAirflowUrl } from '../util'; +import { CheckIcon, ExternalLinkIcon, RepeatIcon, } from '@chakra-ui/icons'; +import { IoTelescopeOutline } from 'react-icons/io5'; +import { NavLink } from 'react-router-dom'; +import { getHoustonRoute, getTargetUrlFromParts, proxyHeaders, proxyUrl, tokenUrlFromAirflowUrl } from '../util'; import ValidatedUrlCheckbox from '../component/ValidatedUrlCheckbox'; +import axios from "axios"; +import { getWorkspaceDeploymentsQuery } from "../constants.js"; export default function SetupPage({ state, dispatch }) { + // Get the workspace ID & etc. if it's software and setup is completed + useEffect( + () => { + if ( + state.isSetupComplete && // setup is completed + !state.isAstro && // it's Software + !(state.releaseName && state.workspaceId && state.deploymentId) // one or more of three isn't set + ){ + axios.post( + proxyUrl(getHoustonRoute(state.urlOrgPart)), + { + operationName: "workspaces", + query: getWorkspaceDeploymentsQuery, + variables: {} + }, + { + headers: proxyHeaders(state.token) + } + ) + .then((res) => { + let found = false; + for (let workspace of res.data?.data?.workspaces) { + if (found) break; + for (let deployment of workspace.deployments) { + if (found) break; + if (deployment.releaseName === state.urlDeploymentPart) { + dispatch({ + type: 'set-software-info', + deploymentId: deployment.id, + releaseName: deployment.releaseName, + workspaceId: workspace.id + }); + } + } + } + res.data?.data?.workspaces + }) + .catch((err) => {}); + } + }, + [state], + ); + return ( Starship is a utility to migrate Airflow metadata between instances + + + + + + + + + ); +} diff --git a/astronomer_starship/src/util.js b/astronomer_starship/src/util.js index ae963ea..6c2dbad 100644 --- a/astronomer_starship/src/util.js +++ b/astronomer_starship/src/util.js @@ -3,7 +3,7 @@ import axios from 'axios'; /** * Returns the PAT URL for a given Airflow URL * for Astro that's https://cloud.astronomer.io/token - * for Software (like https://baseurl.domain.com/airflow/...) it's https://baseurl.domain.com/token + * for Software (like https://deployments.basedomain.com/airflow/...) it's https://basedomain.com/token * @param targetUrl * @returns {string} * @@ -11,13 +11,16 @@ import axios from 'axios'; * 'https://cloud.astronomer.io/token' */ export function tokenUrlFromAirflowUrl(targetUrl) { + // Software if (!targetUrl.includes('astronomer.run')) { const urlBody = targetUrl.split('://')[1]; if (urlBody) { const url = urlBody.split('/', 1)[0] || urlBody; - return `https://${url}/token`; + const basedomain = url.split('deployments.', 2)[1] || url; + return `https://${basedomain}/token`; } } + // Astro return 'https://cloud.astronomer.io/token'; } @@ -31,7 +34,7 @@ export function tokenUrlFromAirflowUrl(targetUrl) { export function getTargetUrlFromParts(urlOrgPart, urlDeploymentPart, isAstro) { return isAstro ? `https://${urlOrgPart}.astronomer.run/${urlDeploymentPart}` - : `https://${urlOrgPart}/${urlDeploymentPart}/airflow`; + : `https://deployments.${urlOrgPart}/${urlDeploymentPart}/airflow`; } /** @@ -98,7 +101,16 @@ export function fetchData( .then((res) => { axios .get(proxyUrl(remoteRouteUrl), { headers: proxyHeaders(token) }) - .then((rRes) => dataDispatch(res, rRes)) // , dispatch)) + .then((rRes) => { + if ( + res.status === 200 && res.headers['content-type'] === 'application/json' && + rRes.status === 200 && res.headers['content-type'] === 'application/json' + ){ + dataDispatch(res, rRes) + } else { + errorDispatch('Invalid response'); + } + }) // , dispatch)) .catch((err) => errorDispatch(err)); // , dispatch)); }) .catch((err) => errorDispatch(err)); // , dispatch)); @@ -108,3 +120,24 @@ export function objectWithoutKey(object, key) { const { [key]: _, ...otherKeys } = object; return otherKeys; } + +/** + * Constructs and returns the URL for the Astro Deployment Environment Variable API route + * + * @param {string} organizationId + * @param {string} deploymentId + * @returns {string} - The URL for the Astro Environment Variable service. + */ +export function getAstroEnvVarRoute(organizationId, deploymentId) { + return `https://api.astronomer.io/platform/v1beta1/organizations/${organizationId}/deployments/${deploymentId}`; +} + +/** + * Constructs and returns the URL for a Houston API + * + * @param {string} basedomain + * @returns {string} - The URL for the Houston service. + */ +export function getHoustonRoute(basedomain) { + return `https://houston.${basedomain}/v1/`; +} diff --git a/astronomer_starship/starship_api.py b/astronomer_starship/starship_api.py index 4de6dcd..0d99cd5 100644 --- a/astronomer_starship/starship_api.py +++ b/astronomer_starship/starship_api.py @@ -2,20 +2,67 @@ from functools import partial import flask +import requests from airflow.plugins_manager import AirflowPlugin from airflow.www.app import csrf from flask import Blueprint, request, jsonify from flask_appbuilder import expose, BaseView +from astronomer_starship.compat.starship_compatability import ( + StarshipCompatabilityLayer, + get_kwargs_fn, +) + +from typing import Any, Dict, List, Union from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Callable -from astronomer_starship.compat.starship_compatability import ( - StarshipCompatabilityLayer, - get_kwargs_fn, -) + +def get_json_or_clean_str(o: str) -> Union[List[Any], Dict[Any, Any], Any]: + """For Aeroscope - Either load JSON (if we can) or strip and split the string, while logging the error""" + from json import JSONDecodeError + import logging + + try: + return json.loads(o) + except (JSONDecodeError, TypeError) as e: + logging.debug(e) + logging.debug(o) + return o.strip() + + +def clean_airflow_report_output(log_string: str) -> Union[dict, str]: + r"""For Aeroscope - Look for the magic string from the Airflow report and then decode the base64 and convert to json + Or return output as a list, trimmed and split on newlines + >>> clean_airflow_report_output('INFO 123 - xyz - abc\n\n\nERROR - 1234\n%%%%%%%\naGVsbG8gd29ybGQ=') + 'hello world' + >>> clean_airflow_report_output( + ... 'INFO 123 - xyz - abc\n\n\nERROR - 1234\n%%%%%%%\neyJvdXRwdXQiOiAiaGVsbG8gd29ybGQifQ==' + ... ) + {'output': 'hello world'} + """ + from json import JSONDecodeError + import base64 + + log_lines = log_string.split("\n") + enumerated_log_lines = list(enumerate(log_lines)) + found_i = -1 + for i, line in enumerated_log_lines: + if "%%%%%%%" in line: + found_i = i + 1 + break + if found_i != -1: + output = base64.decodebytes( + "\n".join(log_lines[found_i:]).encode("utf-8") + ).decode("utf-8") + try: + return json.loads(output) + except JSONDecodeError: + return get_json_or_clean_str(output) + else: + return get_json_or_clean_str(log_string) def starship_route( @@ -32,9 +79,9 @@ def starship_route( kwargs = ( kwargs_fn( request_method=request_method, - args=request.args - if request_method in ["GET", "POST", "DELETE"] - else {}, + args=( + request.args if request_method in ["GET", "POST", "DELETE"] else {} + ), json=(request.json if request.is_json else {}), ) if kwargs_fn @@ -126,6 +173,54 @@ def ok(): return starship_route(get=ok) + @expose("/telescope", methods=["GET"]) + @csrf.exempt + def telescope(self): + from socket import gethostname + import io + import runpy + from urllib.request import urlretrieve + from contextlib import redirect_stdout, redirect_stderr + from urllib.error import HTTPError + from datetime import datetime, timezone + import os + + aero_version = os.getenv("TELESCOPE_REPORT_RELEASE_VERSION", "latest") + a = "airflow_report.pyz" + aero_url = ( + "https://github.com/astronomer/telescope/releases/latest/download/airflow_report.pyz" + if aero_version == "latest" + else f"https://github.com/astronomer/telescope/releases/download/{aero_version}/airflow_report.pyz" + ) + try: + urlretrieve(aero_url, a) + except HTTPError as e: + raise RuntimeError( + f"Error finding specified version:{aero_version} -- Reason:{e.reason}" + ) + + s = io.StringIO() + with redirect_stdout(s), redirect_stderr(s): + runpy.run_path(a) + report = { + "telescope_version": "aeroscope-latest", + "report_date": datetime.now(timezone.utc).isoformat()[:10], + "organization_name": request.args["organization"], + "local": { + gethostname(): { + "airflow_report": clean_airflow_report_output(s.getvalue()) + } + }, + } + presigned_url = request.args.get("presigned_url", False) + if presigned_url: + try: + upload = requests.put(presigned_url, data=json.dumps(report)) + return upload.content, upload.status_code + except requests.exceptions.ConnectionError as e: + return str(e), 400 + return report + @expose("/airflow_version", methods=["GET"]) @csrf.exempt def airflow_version(self) -> str: @@ -408,26 +503,30 @@ def dag_runs(self): **Response**: ```json - [ - { - "dag_id": "dag_0", - "queued_at": "1970-01-01T00:00:00+00:00", - "execution_date": "1970-01-01T00:00:00+00:00", - "start_date": "1970-01-01T00:00:00+00:00", - "end_date": "1970-01-01T00:00:00+00:00", - "state": "SUCCESS", - "run_id": "manual__1970-01-01T00:00:00+00:00", - "creating_job_id": 123, - "external_trigger": true, - "run_type": "manual", - "conf": None, - "data_interval_start": "1970-01-01T00:00:00+00:00", - "data_interval_end": "1970-01-01T00:00:00+00:00", - "last_scheduling_decision": "1970-01-01T00:00:00+00:00", - "dag_hash": "...." - }, - ... - ] + { + "dag_run_count": 1, + "dag_runs": + [ + { + "dag_id": "dag_0", + "queued_at": "1970-01-01T00:00:00+00:00", + "execution_date": "1970-01-01T00:00:00+00:00", + "start_date": "1970-01-01T00:00:00+00:00", + "end_date": "1970-01-01T00:00:00+00:00", + "state": "SUCCESS", + "run_id": "manual__1970-01-01T00:00:00+00:00", + "creating_job_id": 123, + "external_trigger": true, + "run_type": "manual", + "conf": None, + "data_interval_start": "1970-01-01T00:00:00+00:00", + "data_interval_end": "1970-01-01T00:00:00+00:00", + "last_scheduling_decision": "1970-01-01T00:00:00+00:00", + "dag_hash": "...." + }, + ... + ] + } ``` ### `POST /api/starship/dag_runs` diff --git a/docs/api.md b/docs/api.md index 68cb652..012ab4f 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1,4 +1,19 @@ # API + +## Error Responses +In the event of an error, the API will return a JSON response with an `error` key +and an HTTP `status_code`. The `error` key will contain a message describing the error. + +| **Type** | **Status Code** | **Response Example** | +|-----------------------------------|-----------------|---------------------------------------------------------------------------------------------| +| **Request kwargs - RuntimeError** | 400 | ```{"error": "..."}``` | +| **Request kwargs - Exception** | 500 | ```{"error": "Unknown Error in kwargs_fn - ..."}``` | +| **Unknown Error** | 500 | ```{"error": "Unknown Error", "error_type": ..., "error_message": ..., "kwargs": ...}``` | +| **`POST` Integrity Error** | 409 | ```{"error": "Integrity Error (Duplicate Record?)", "error_message": ..., "kwargs": ...}``` | +| **`POST` Data Error** | 400 | ```{"error": "Data Error", "error_message": ..., "kwargs": ...}``` | +| **`POST` SQL Error** | 400 | ```{"error": "SQL Error", "error_message": ..., "kwargs": ...}``` | + + ## Airflow Version ::: astronomer_starship.starship_api.StarshipApi.airflow_version options: diff --git a/docs/index.md b/docs/index.md index b0536f2..bbf3b9b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -17,7 +17,7 @@ Connections, Environment Variables, Pools, and DAG History between two Airflow i

Logo of Spaceship

diff --git a/docs/operator.md b/docs/operator.md index e685a9d..eadb458 100644 --- a/docs/operator.md +++ b/docs/operator.md @@ -1,69 +1,80 @@ -The Starship Operator should be used in instances where the Airflow Webserver is unable to correctly host a Plugin. +# Starship Migration DAG +The `StarshipAirflowMigrationDAG` can be used to migrate Airflow Variables, Pools, Connections, +and DAG History from one Airflow instance to another. -The `AstroMigrationOperator` should be used if migrating from a +The `StarshipAirflowMigrationDAG` should be used in instances where the **source** Airflow Webserver +is unable to correctly host a Plugin. The Target must still have a functioning Starship Plugin installed, +be running the same version of Airflow, and have the same set of DAGs deployed. + +The `StarshipAirflowMigrationDAG` should be used if migrating from a Google Cloud Composer 1 (with Airflow 2.x) or MWAA v2.0.2 environment. -These environments do not support webserver plugins and will require using the `AstroMigrationOperator` +These environments do not support webserver plugins and will require using the `StarshipAirflowMigrationDAG` to migrate data. ## Installation Add the following line to your `requirements.txt` in your source environment: ``` - astronomer-starship==1.2.1 + astronomer-starship[provider] ``` +## Setup +Make a connection in Airflow with the following details: +- **Conn ID**: `starship_default` +- **Conn Type**: `HTTP` +- **Host**: the URL of the homepage of Airflow (excluding `/home` on the end of the URL) + - For example, if your deployment URL is `https://astronomer.astronomer.run/abcdt4ry/home`, you'll use `https://astronomer.astronomer.run/abcdt4ry` +- **Schema**: `https` +- **Extras**: `{"Authorization": "Bearer "}` + ## Usage 1. Add the following DAG to your source environment: - ```python title="dags/astronomer_migration_dag.py" - from airflow import DAG + ```python title="dags/starship_airflow_migration_dag.py" + from astronomer_starship.providers.starship.operators.starship import StarshipAirflowMigrationDAG - from astronomer.starship.operators import AstroMigrationOperator - from datetime import datetime + globals()['starship_airflow_migration_dag'] = StarshipAirflowMigrationDAG(http_conn_id="starship_default") + ``` - with DAG( - dag_id="astronomer_migration_dag", - start_date=datetime(2020, 8, 15), - schedule_interval=None, - ) as dag: +2. Unpause the DAG in the Airflow UI +3. Once the DAG successfully runs, your connections, variables, and environment variables should all be migrated to Astronomer - AstroMigrationOperator( - task_id="export_meta", - deployment_url='{{ dag_run.conf["deployment_url"] }}', - token='{{ dag_run.conf["astro_token"] }}', - ) - ``` +## Configuration -3. Deploy this DAG to your source Airflow environment, configured as described in the **Configuration** section below -4. Once the DAG is available in the Airflow UI, click the "Trigger DAG" button, then click "Trigger DAG w/ config", and input the following in the configuration dictionary: - - `astro_token`: To retrieve anf Astronomer token, navigate to [cloud.astronomer.io/token](https://cloud.astronomer.io/token) and log in using your Astronomer credentials - - `deployment_url`: To retrieve a deployment URL - navigate to the Astronomer Airlow deployment that you'd like to migrate to in the Astronomer UI, click `Open Airflow` and copy the page URL (excluding `/home` on the end of the URL) - - For example, if your deployment URL is `https://astronomer.astronomer.run/abcdt4ry/home`, you'll use `https://astronomer.astronomer.run/abcdt4ry` - - The config dictionary used when triggering the DAG should be formatted as: - - ```json - { - "deployment_url": "your-deployment-url", - "astro_token": "your-astro-token" - } - ``` -5. Once the DAG successfully runs, your connections, variables, and environment variables should all be migrated to Astronomer +The `StarshipAirflowMigrationDAG` can be configured as follows: -### Configuration +```python +StarshipAirflowMigrationDAG( + http_conn_id="starship_default", + variables=None, # None to migrate all, or ["var1", "var2"] to migrate specific items, or empty list to skip all + pools=None, # None to migrate all, or ["pool1", "pool2"] to migrate specific items, or empty list to skip all + connections=None, # None to migrate all, or ["conn1", "conn2"] to migrate specific items, or empty list to skip all + dag_ids=None, # None to migrate all, or ["dag1", "dag2"] to migrate specific items, or empty list to skip all +) +``` -The `AstroMigrationOperator` can be configured as follows: +You can use this DAG to migrate all items, or specific items by providing a list of names. -- `variables_exclude_list`: List the individual Airflow Variables which you **do not** want to be migrated. Any Variables not listed will be migrated to the desination Airflow deployment. -- `connection_exclude_list`: List the individual Airflow Connections which you **do not** want to be migrated. Any Variables not listed will be migrated to the desination Airflow deployment. -- `env_include_list`: List the individual Environment Variables which you **do** want to be migrated. Only the Environment Variables listed will be migrated to the desination Airflow deployment. None are migrated by default. +You can skip migration by providing an empty list. - ```python - AstroMigrationOperator( - task_id="export_meta", - deployment_url='{{ dag_run.conf["deployment_url"] }}', - token='{{ dag_run.conf["astro_token"] }}', - variables_exclude_list=["some_var_1"], - connection_exclude_list=["some_conn_1"], - env_include_list=["FOO", "BAR"], - ) - ``` +## Python API + +### Hooks + +::: astronomer_starship.providers.starship.hooks.starship + options: + heading_level: 4 + show_root_toc_entry: false + show_root_heading: false + inherited_members: true + show_source: false + +### Operators, TaskGroups, DAG + +::: astronomer_starship.providers.starship.operators.starship + options: + heading_level: 4 + show_root_toc_entry: false + show_root_heading: false + inherited_members: true + show_source: false diff --git a/starship.png b/docs/starship.png similarity index 100% rename from starship.png rename to docs/starship.png diff --git a/starship.svg b/docs/starship.svg similarity index 100% rename from starship.svg rename to docs/starship.svg diff --git a/starship_diagram.svg b/docs/starship_diagram.svg similarity index 100% rename from starship_diagram.svg rename to docs/starship_diagram.svg diff --git a/mkdocs.yml b/mkdocs.yml index 94619b2..f8d16ce 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -6,6 +6,9 @@ site_url: https://astronomer.github.io/starship/ theme: name: material + logo: starship.svg + favicon: starship.png + palette: # Palette toggle for light mode - media: "(prefers-color-scheme: light)" @@ -45,6 +48,9 @@ theme: - search.suggest markdown_extensions: +- toc: + permalink: true + - pymdownx.superfences - pymdownx.highlight: use_pygments: true diff --git a/pyproject.toml b/pyproject.toml index 821f3db..fe063fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,10 @@ exclude = [ ] [project.optional-dependencies] +provider = [ + "apache-airflow-providers-http" +] + dev = [ # package "twine",