Skip to content

Commit

Permalink
Merge branch 'main' into feature/http-extra-options-check-response
Browse files Browse the repository at this point in the history
  • Loading branch information
dabla authored Jan 7, 2025
2 parents 564e18b + 4ab8e5e commit 35dbb8b
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 22 deletions.
4 changes: 2 additions & 2 deletions airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ def write_dag(

log.debug("Checking if DAG (%s) changed", dag.dag_id)
new_serialized_dag = cls(dag)
serialized_dag_db = session.execute(
serialized_dag_hash = session.scalars(
select(cls.dag_hash).where(cls.dag_id == dag.dag_id).order_by(cls.created_at.desc())
).first()

if serialized_dag_db is not None and serialized_dag_db.dag_hash == new_serialized_dag.dag_hash:
if serialized_dag_hash is not None and serialized_dag_hash == new_serialized_dag.dag_hash:
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
return False
dagv = DagVersion.write_dag(
Expand Down
13 changes: 7 additions & 6 deletions airflow/ui/src/components/ClearRun/ClearRunButton.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@ import { Box, useDisclosure } from "@chakra-ui/react";
import { useState } from "react";
import { FiRefreshCw } from "react-icons/fi";

import type { TaskInstanceCollectionResponse } from "openapi/requests/types.gen";
import type { DAGRunResponse, TaskInstanceCollectionResponse } from "openapi/requests/types.gen";
import { useClearDagRun } from "src/queries/useClearRun";

import ActionButton from "../ui/ActionButton";
import ClearRunDialog from "./ClearRunDialog";

type Props = {
readonly dagId: string;
readonly dagRunId: string;
readonly dagRun: DAGRunResponse;
readonly withText?: boolean;
};

const ClearRunButton = ({ dagId, dagRunId, withText = true }: Props) => {
const ClearRunButton = ({ dagRun, withText = true }: Props) => {
const { onClose, onOpen, open } = useDisclosure();

const [onlyFailed, setOnlyFailed] = useState(false);
Expand All @@ -42,6 +41,9 @@ const ClearRunButton = ({ dagId, dagRunId, withText = true }: Props) => {
total_entries: 0,
});

const dagId = dagRun.dag_id;
const dagRunId = dagRun.dag_run_id;

const { isPending, mutate } = useClearDagRun({
dagId,
dagRunId,
Expand All @@ -68,8 +70,7 @@ const ClearRunButton = ({ dagId, dagRunId, withText = true }: Props) => {

<ClearRunDialog
affectedTasks={affectedTasks}
dagId={dagId}
dagRunId={dagRunId}
dagRun={dagRun}
isPending={isPending}
mutate={mutate}
onClose={onClose}
Expand Down
29 changes: 22 additions & 7 deletions airflow/ui/src/components/ClearRun/ClearRunDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
* under the License.
*/
import { Flex, Heading, VStack } from "@chakra-ui/react";
import { useState } from "react";
import { FiRefreshCw } from "react-icons/fi";

import type { DAGRunClearBody, TaskInstanceCollectionResponse } from "openapi/requests/types.gen";
import type {
DAGRunClearBody,
DAGRunResponse,
TaskInstanceCollectionResponse,
} from "openapi/requests/types.gen";
import { Button, Dialog } from "src/components/ui";
import { usePatchDagRun } from "src/queries/usePatchDagRun";

import SegmentedControl from "../ui/SegmentedControl";
import ClearRunTasksAccordion from "./ClearRunTaskAccordion";

type Props = {
readonly affectedTasks: TaskInstanceCollectionResponse;
readonly dagId: string;
readonly dagRunId: string;
readonly dagRun: DAGRunResponse;
readonly isPending: boolean;
readonly mutate: ({
dagId,
Expand All @@ -47,15 +52,20 @@ type Props = {

const ClearRunDialog = ({
affectedTasks,
dagId,
dagRunId,
dagRun,
isPending,
mutate,
onClose,
onlyFailed,
open,
setOnlyFailed,
}: Props) => {
const dagId = dagRun.dag_id;
const dagRunId = dagRun.dag_run_id;

const [note, setNote] = useState<string | null>(dagRun.note);
const { isPending: isPendingPatchDagRun, mutate: mutatePatchDagRun } = usePatchDagRun({ dagId, dagRunId });

const onChange = (value: string) => {
switch (value) {
case "existing_tasks":
Expand Down Expand Up @@ -108,17 +118,22 @@ const ClearRunDialog = ({
value={onlyFailed ? "only_failed" : "existing_tasks"}
/>
</Flex>
<ClearRunTasksAccordion affectedTasks={affectedTasks} />
<ClearRunTasksAccordion affectedTasks={affectedTasks} note={note} setNote={setNote} />
<Flex justifyContent="end" mt={3}>
<Button
colorPalette="blue"
loading={isPending}
loading={isPending || isPendingPatchDagRun}
onClick={() => {
mutate({
dagId,
dagRunId,
requestBody: { dry_run: false, only_failed: onlyFailed },
});
mutatePatchDagRun({
dagId,
dagRunId,
requestBody: { note },
});
}}
>
<FiRefreshCw /> Confirm
Expand Down
51 changes: 47 additions & 4 deletions airflow/ui/src/components/ClearRun/ClearRunTaskAccordion.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Box, Text } from "@chakra-ui/react";
import { Box, Editable, Text, VStack } from "@chakra-ui/react";
import { Link } from "@chakra-ui/react";
import type { ColumnDef } from "@tanstack/react-table";
import type { ChangeEvent } from "react";
import Markdown from "react-markdown";
import { Link as RouterLink } from "react-router-dom";

import type { TaskInstanceCollectionResponse, TaskInstanceResponse } from "openapi/requests/types.gen";
import type {
DAGRunResponse,
TaskInstanceCollectionResponse,
TaskInstanceResponse,
} from "openapi/requests/types.gen";
import { DataTable } from "src/components/DataTable";
import { Status, Tooltip } from "src/components/ui";
import { getTaskInstanceLink } from "src/utils/links";
Expand Down Expand Up @@ -69,12 +75,14 @@ const columns: Array<ColumnDef<TaskInstanceResponse>> = [

type Props = {
readonly affectedTasks?: TaskInstanceCollectionResponse;
readonly note: DAGRunResponse["note"];
readonly setNote: (value: string) => void;
};

// Table is in memory, pagination and sorting are disabled.
// TODO: Make a front-end only unconnected table component with client side ordering and pagination
const ClearRunTasksAccordion = ({ affectedTasks }: Props) => (
<Accordion.Root collapsible variant="enclosed">
const ClearRunTasksAccordion = ({ affectedTasks, note, setNote }: Props) => (
<Accordion.Root collapsible defaultValue={["note"]} multiple={false} variant="enclosed">
<Accordion.Item key="tasks" value="tasks">
<Accordion.ItemTrigger>
<Text fontWeight="bold">Affected Tasks: {affectedTasks?.total_entries ?? 0}</Text>
Expand All @@ -98,6 +106,41 @@ const ClearRunTasksAccordion = ({ affectedTasks }: Props) => (
</Box>
</Accordion.ItemContent>
</Accordion.Item>
<Accordion.Item key="note" value="note">
<Accordion.ItemTrigger>
<Text fontWeight="bold">Note</Text>
</Accordion.ItemTrigger>
<Accordion.ItemContent>
<Editable.Root
onChange={(event: ChangeEvent<HTMLInputElement>) => setNote(event.target.value)}
value={note ?? ""}
>
<Editable.Preview
alignItems="flex-start"
as={VStack}
gap="0"
height="200px"
overflowY="auto"
width="100%"
>
{Boolean(note) ? (
<Markdown>{note}</Markdown>
) : (
<Text color="gray" opacity={0.6}>
Add a note...
</Text>
)}
</Editable.Preview>
<Editable.Textarea
data-testid="notes-input"
height="200px"
overflowY="auto"
placeholder="Add a note..."
resize="none"
/>
</Editable.Root>
</Accordion.ItemContent>
</Accordion.Item>
</Accordion.Root>
);

Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/src/pages/Dag/Runs/Runs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ const columns: Array<ColumnDef<DAGRunResponse>> = [
accessorKey: "clear_dag_run",
cell: ({ row }) => (
<Flex justifyContent="end">
<ClearRunButton dagId={row.original.dag_id} dagRunId={row.original.dag_run_id} withText={false} />
<ClearRunButton dagRun={row.original} withText={false} />
</Flex>
),
enableSorting: false,
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/src/pages/Run/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export const Header = ({ dagRun }: { readonly dagRun: DAGRunResponse }) => (
</Flex>
</HStack>
<HStack>
<ClearRunButton dagId={dagRun.dag_id} dagRunId={dagRun.dag_run_id} />
<ClearRunButton dagRun={dagRun} />
</HStack>
</Flex>
{dagRun.note === null || dagRun.note.length === 0 ? undefined : (
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/src/queries/useClearRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { toaster } from "src/components/ui";

const onError = () => {
toaster.create({
description: "Clear Dag Run request failed.",
description: "Clear Dag Run request failed",
title: "Failed to clear the Dag Run",
type: "error",
});
Expand Down
49 changes: 49 additions & 0 deletions airflow/ui/src/queries/usePatchDagRun.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { useQueryClient } from "@tanstack/react-query";

import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
useDagRunServicePatchDagRun,
} from "openapi/queries";
import { toaster } from "src/components/ui";

const onError = () => {
toaster.create({
description: "Patch Dag Run request failed",
title: "Failed to patch the Dag Run",
type: "error",
});
};

export const usePatchDagRun = ({ dagId, dagRunId }: { dagId: string; dagRunId: string }) => {
const queryClient = useQueryClient();

const onSuccess = async () => {
const queryKeys = [UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), [useDagRunServiceGetDagRunsKey]];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
};

return useDagRunServicePatchDagRun({
onError,
onSuccess,
});
};

0 comments on commit 35dbb8b

Please sign in to comment.