From b854d534625e8063ee76d91a9f0d38a0ee90eaca Mon Sep 17 00:00:00 2001 From: atulya-astronomer Date: Tue, 17 Dec 2024 17:41:30 +0530 Subject: [PATCH] Added delete button for Variables, Pools, Connections and Dag Runs (#114) * Added delete APIs * Added delete button to DAG page * Changed to params from body * Added tests and updated version to 2.2 * Updated documentation * Added docker tests All tests passing. Merging and creating a new release --- astronomer_starship/__init__.py | 2 +- .../compat/starship_compatability.py | 69 +++++++++++++++++-- .../src/component/MigrateButton.jsx | 33 ++++++--- .../src/pages/DAGHistoryPage.jsx | 52 +++++++++++--- astronomer_starship/starship.py | 2 +- astronomer_starship/starship_api.py | 52 ++++++++++++-- tests/api_integration_test.py | 14 +++- tests/docker_test/docker_test.py | 18 +++++ 8 files changed, 212 insertions(+), 30 deletions(-) diff --git a/astronomer_starship/__init__.py b/astronomer_starship/__init__.py index 0741d2a..d031897 100644 --- a/astronomer_starship/__init__.py +++ b/astronomer_starship/__init__.py @@ -1,4 +1,4 @@ -__version__ = "2.1.0" +__version__ = "2.2.0" def get_provider_info(): diff --git a/astronomer_starship/compat/starship_compatability.py b/astronomer_starship/compat/starship_compatability.py index 413e595..f6fa864 100644 --- a/astronomer_starship/compat/starship_compatability.py +++ b/astronomer_starship/compat/starship_compatability.py @@ -1,6 +1,7 @@ import json +import logging import os -from flask import jsonify +from flask import jsonify, Response from sqlalchemy.orm import Session from typing import TYPE_CHECKING @@ -12,6 +13,9 @@ import pytz +logger = logging.getLogger(__name__) + + def get_from_request(args, json, key, required: bool = False) -> "Any": val = json.get(key, args.get(key)) if val is None and required: @@ -146,6 +150,24 @@ def generic_set_one(session: Session, qualname: str, attrs: dict, **kwargs): raise e +def generic_delete(session: Session, qualname: str, **kwargs) -> Response: + from http import HTTPStatus + from sqlalchemy import delete + + (_, thing_cls) = import_from_qualname(qualname) + + try: + filters = [getattr(thing_cls, attr) == val for attr, val in kwargs.items()] + deleted_rows = session.execute(delete(thing_cls).where(*filters)).rowcount + session.commit() + logger.info(f"Deleted {deleted_rows} rows for table {qualname}") + return Response(status=HTTPStatus.NO_CONTENT) + except Exception as e: + logger.error(f"Error deleting row(s) for table {qualname}: {e}") + session.rollback() + raise e + + def get_test_data(attrs: dict, method: "Union[str, None]" = None) -> "Dict[str, Any]": """ >>> get_test_data(method="POST", attrs={"key": {"attr": "key", "methods": [("POST", True)], "test_value": "key"}}) @@ -195,10 +217,21 @@ def set_env_vars(cls): res.status_code = 409 raise NotImplementedError() + @classmethod + def delete_env_vars(cls): + """This is not possible to do via API, so return an error""" + res = jsonify({"error": "Not implemented"}) + res.status_code = 405 + raise NotImplementedError() + @classmethod def variable_attrs(cls) -> "Dict[str, AttrDesc]": return { - "key": {"attr": "key", "methods": [("POST", True)], "test_value": "key"}, + "key": { + "attr": "key", + "methods": [("POST", True), ("DELETE", True)], + "test_value": "key", + }, "val": {"attr": "val", "methods": [("POST", True)], "test_value": "val"}, "description": { "attr": "description", @@ -217,12 +250,16 @@ def set_variable(self, **kwargs): self.session, "airflow.models.Variable", self.variable_attrs(), **kwargs ) + def delete_variable(self, **kwargs): + attrs = {self.variable_attrs()[k]["attr"]: v for k, v in kwargs.items()} + return generic_delete(self.session, "airflow.models.Variable", **attrs) + @classmethod def pool_attrs(cls) -> "Dict[str, AttrDesc]": return { "name": { "attr": "pool", - "methods": [("POST", True)], + "methods": [("POST", True), ("DELETE", True)], "test_value": "test_name", }, "slots": {"attr": "slots", "methods": [("POST", True)], "test_value": 1}, @@ -241,12 +278,20 @@ def set_pool(self, **kwargs): self.session, "airflow.models.Pool", self.pool_attrs(), **kwargs ) + def delete_pool(self, **kwargs): + attrs = { + self.pool_attrs()[k]["attr"]: v + for k, v in kwargs.items() + if k in self.pool_attrs() + } + return generic_delete(self.session, "airflow.models.Pool", **attrs) + @classmethod def connection_attrs(cls) -> "Dict[str, AttrDesc]": return { "conn_id": { "attr": "conn_id", - "methods": [("POST", True)], + "methods": [("POST", True), ("DELETE", True)], "test_value": "conn_id", }, "conn_type": { @@ -301,6 +346,10 @@ def set_connection(self, **kwargs): self.session, "airflow.models.Connection", self.connection_attrs(), **kwargs ) + def delete_connection(self, **kwargs): + attrs = {self.connection_attrs()[k]["attr"]: v for k, v in kwargs.items()} + return generic_delete(self.session, "airflow.models.Connection", **attrs) + @classmethod def dag_attrs(cls) -> "Dict[str, AttrDesc]": return { @@ -442,7 +491,7 @@ def dag_runs_attrs(cls) -> "Dict[str, AttrDesc]": return { "dag_id": { "attr": "dag_id", - "methods": [("GET", True)], + "methods": [("GET", True), ("DELETE", True)], "test_value": "dag_0", }, # Limit is the number of rows to return. @@ -591,6 +640,10 @@ def set_dag_runs(self, dag_runs: list): dag_runs = self.insert_directly("dag_run", dag_runs) return {"dag_runs": dag_runs, "dag_run_count": self._get_dag_run_count(dag_id)} + def delete_dag_runs(self, **kwargs): + attrs = {self.dag_runs_attrs()[k]["attr"]: v for k, v in kwargs.items()} + return generic_delete(self.session, "airflow.models.DagRun", **attrs) + @classmethod def task_instances_attrs(cls) -> "Dict[str, AttrDesc]": epoch = datetime.datetime(1970, 1, 1, 0, 0) @@ -600,7 +653,7 @@ def task_instances_attrs(cls) -> "Dict[str, AttrDesc]": return { "dag_id": { "attr": "dag_id", - "methods": [("GET", True)], + "methods": [("GET", True), ("DELETE", True)], "test_value": "dag_0", }, # Limit is the number of rows to return. @@ -853,6 +906,10 @@ def set_task_instances(self, task_instances: list): task_instances = self.insert_directly("task_instance", task_instances) return {"task_instances": task_instances} + def delete_task_instances(self, **kwargs): + attrs = {self.task_instances_attrs()[k]["attr"]: v for k, v in kwargs.items()} + return generic_delete(self.session, "airflow.models.TaskInstance", **attrs) + def insert_directly(self, table_name, items): from sqlalchemy.exc import InvalidRequestError from sqlalchemy import MetaData diff --git a/astronomer_starship/src/component/MigrateButton.jsx b/astronomer_starship/src/component/MigrateButton.jsx index a68644c..08829b6 100644 --- a/astronomer_starship/src/component/MigrateButton.jsx +++ b/astronomer_starship/src/component/MigrateButton.jsx @@ -2,11 +2,16 @@ import React, { useState } from 'react'; import axios from 'axios'; import { Button, useToast } from '@chakra-ui/react'; -import { MdErrorOutline } from 'react-icons/md'; -import { FaCheck } from 'react-icons/fa'; +import { MdErrorOutline, MdDeleteForever } from 'react-icons/md'; import { GoUpload } from 'react-icons/go'; import PropTypes from 'prop-types'; +function checkStatus(status, exists) { + if (status === 204) + return false; + return status === 200 || exists; +} + export default function MigrateButton({ route, headers, existsInRemote, sendData, isDisabled, }) { @@ -16,13 +21,23 @@ export default function MigrateButton({ const [exists, setExists] = useState(existsInRemote); function handleClick() { setLoading(true); - axios.post(route, sendData, { headers }) + axios({ + method: exists ? 'delete' : 'post', + url: route, + headers, + params: sendData, + }) .then((res) => { setLoading(false); - setExists(res.status === 200); + setExists(checkStatus(res.status, exists)); + toast({ + title: 'Success', + status: 'success', + isClosable: true, + }) }) .catch((err) => { - setExists(false); + setExists(exists); setLoading(false); toast({ title: err.response?.data?.error || err.response?.data || err.message, @@ -34,19 +49,19 @@ export default function MigrateButton({ } return ( ); } diff --git a/astronomer_starship/src/pages/DAGHistoryPage.jsx b/astronomer_starship/src/pages/DAGHistoryPage.jsx index 1f73a8e..03d6cf3 100644 --- a/astronomer_starship/src/pages/DAGHistoryPage.jsx +++ b/astronomer_starship/src/pages/DAGHistoryPage.jsx @@ -19,9 +19,8 @@ import { } from '@chakra-ui/react'; import PropTypes from 'prop-types'; import axios from 'axios'; -import { MdErrorOutline } from 'react-icons/md'; +import { MdErrorOutline, MdDeleteForever } from 'react-icons/md'; import { GrDocumentMissing } from 'react-icons/gr'; -import { FaCheck } from 'react-icons/fa'; import { GoUpload } from 'react-icons/go'; import humanFormat from 'human-format'; import { ExternalLinkIcon, RepeatIcon } from '@chakra-ui/icons'; @@ -56,6 +55,44 @@ function DAGHistoryMigrateButton({ const percent = 100; function handleClick() { + + function deleteRuns() { + setLoadPerc(percent * 0.5); + axios({ + method: 'delete', + url: proxyUrl(url + constants.DAG_RUNS_ROUTE), + headers: proxyHeaders(token), + params: { dag_id: dagId }, + }).then((res) => { + setExists(!(res.status === 204)); + dispatch({ + type: 'set-dags-data', + dagsData: { + [dagId]: { + remote: { + dag_run_count: 0, + }, + }, + }, + }); + setLoadPerc(percent * 1); + setLoadPerc(0); + }).catch((err) => { + setExists(false); + setLoadPerc(percent * 0); + toast({ + title: err.response?.data?.error || err.response?.data || err.message, + status: 'error', + isClosable: true, + }); + setError(err); + }); + } + + if (exists) { + deleteRuns(); + return; + } const errFn = (err) => { setExists(false); // noinspection PointlessArithmeticExpressionJS @@ -117,23 +154,23 @@ function DAGHistoryMigrateButton({ return (