Skip to content

Commit e964c64

Browse files
kandharvishnukandharvishnuu
andauthored
AIP-84 Get Task Instance Try Details (#43675)
* adding GetTaskInstanceTryDetails * adding test cases * changing status code * removing 401,403 code * removing async * adding types and rebase --------- Co-authored-by: kandharvishnuu <[email protected]>
1 parent c83768a commit e964c64

File tree

12 files changed

+914
-0
lines changed

12 files changed

+914
-0
lines changed

airflow/api_connexion/endpoints/task_instance_endpoint.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,7 @@ def get_mapped_task_instance_dependencies(
728728
)
729729

730730

731+
@mark_fastapi_migration_done
731732
@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
732733
@provide_session
733734
def get_task_instance_try_details(

airflow/api_fastapi/core_api/datamodels/task_instances.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,39 @@ class TaskInstancesBatchBody(BaseModel):
114114
page_offset: NonNegativeInt = 0
115115
page_limit: NonNegativeInt = 100
116116
order_by: str | None = None
117+
118+
119+
class TaskInstanceHistoryResponse(BaseModel):
120+
"""TaskInstanceHistory serializer for responses."""
121+
122+
model_config = ConfigDict(populate_by_name=True)
123+
124+
task_id: str
125+
dag_id: str
126+
run_id: str = Field(alias="dag_run_id")
127+
map_index: int
128+
start_date: datetime | None
129+
end_date: datetime | None
130+
duration: float | None
131+
state: TaskInstanceState | None
132+
try_number: int
133+
max_tries: int
134+
task_display_name: str
135+
hostname: str | None
136+
unixname: str | None
137+
pool: str
138+
pool_slots: int
139+
queue: str | None
140+
priority_weight: int | None
141+
operator: str | None
142+
queued_dttm: datetime | None = Field(alias="queued_when")
143+
pid: int | None
144+
executor: str | None
145+
executor_config: Annotated[str, BeforeValidator(str)]
146+
147+
148+
class TaskInstanceHistoryCollectionResponse(BaseModel):
149+
"""TaskInstanceHistory Collection serializer for responses."""
150+
151+
task_instances: list[TaskInstanceHistoryResponse]
152+
total_entries: int

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3676,6 +3676,76 @@ paths:
36763676
application/json:
36773677
schema:
36783678
$ref: '#/components/schemas/HTTPValidationError'
3679+
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}:
3680+
get:
3681+
tags:
3682+
- Task Instance
3683+
summary: Get Task Instance Try Details
3684+
description: Get task instance details by try number.
3685+
operationId: get_task_instance_try_details
3686+
parameters:
3687+
- name: dag_id
3688+
in: path
3689+
required: true
3690+
schema:
3691+
type: string
3692+
title: Dag Id
3693+
- name: dag_run_id
3694+
in: path
3695+
required: true
3696+
schema:
3697+
type: string
3698+
title: Dag Run Id
3699+
- name: task_id
3700+
in: path
3701+
required: true
3702+
schema:
3703+
type: string
3704+
title: Task Id
3705+
- name: task_try_number
3706+
in: path
3707+
required: true
3708+
schema:
3709+
type: integer
3710+
title: Task Try Number
3711+
- name: map_index
3712+
in: query
3713+
required: false
3714+
schema:
3715+
type: integer
3716+
default: -1
3717+
title: Map Index
3718+
responses:
3719+
'200':
3720+
description: Successful Response
3721+
content:
3722+
application/json:
3723+
schema:
3724+
$ref: '#/components/schemas/TaskInstanceHistoryResponse'
3725+
'401':
3726+
content:
3727+
application/json:
3728+
schema:
3729+
$ref: '#/components/schemas/HTTPExceptionResponse'
3730+
description: Unauthorized
3731+
'403':
3732+
content:
3733+
application/json:
3734+
schema:
3735+
$ref: '#/components/schemas/HTTPExceptionResponse'
3736+
description: Forbidden
3737+
'404':
3738+
content:
3739+
application/json:
3740+
schema:
3741+
$ref: '#/components/schemas/HTTPExceptionResponse'
3742+
description: Not Found
3743+
'422':
3744+
description: Validation Error
3745+
content:
3746+
application/json:
3747+
schema:
3748+
$ref: '#/components/schemas/HTTPValidationError'
36793749
/public/dags/{dag_id}/tasks/:
36803750
get:
36813751
tags:
@@ -6197,6 +6267,126 @@ components:
61976267
- total_entries
61986268
title: TaskInstanceCollectionResponse
61996269
description: Task Instance Collection serializer for responses.
6270+
TaskInstanceHistoryResponse:
6271+
properties:
6272+
task_id:
6273+
type: string
6274+
title: Task Id
6275+
dag_id:
6276+
type: string
6277+
title: Dag Id
6278+
dag_run_id:
6279+
type: string
6280+
title: Dag Run Id
6281+
map_index:
6282+
type: integer
6283+
title: Map Index
6284+
start_date:
6285+
anyOf:
6286+
- type: string
6287+
format: date-time
6288+
- type: 'null'
6289+
title: Start Date
6290+
end_date:
6291+
anyOf:
6292+
- type: string
6293+
format: date-time
6294+
- type: 'null'
6295+
title: End Date
6296+
duration:
6297+
anyOf:
6298+
- type: number
6299+
- type: 'null'
6300+
title: Duration
6301+
state:
6302+
anyOf:
6303+
- $ref: '#/components/schemas/TaskInstanceState'
6304+
- type: 'null'
6305+
try_number:
6306+
type: integer
6307+
title: Try Number
6308+
max_tries:
6309+
type: integer
6310+
title: Max Tries
6311+
task_display_name:
6312+
type: string
6313+
title: Task Display Name
6314+
hostname:
6315+
anyOf:
6316+
- type: string
6317+
- type: 'null'
6318+
title: Hostname
6319+
unixname:
6320+
anyOf:
6321+
- type: string
6322+
- type: 'null'
6323+
title: Unixname
6324+
pool:
6325+
type: string
6326+
title: Pool
6327+
pool_slots:
6328+
type: integer
6329+
title: Pool Slots
6330+
queue:
6331+
anyOf:
6332+
- type: string
6333+
- type: 'null'
6334+
title: Queue
6335+
priority_weight:
6336+
anyOf:
6337+
- type: integer
6338+
- type: 'null'
6339+
title: Priority Weight
6340+
operator:
6341+
anyOf:
6342+
- type: string
6343+
- type: 'null'
6344+
title: Operator
6345+
queued_when:
6346+
anyOf:
6347+
- type: string
6348+
format: date-time
6349+
- type: 'null'
6350+
title: Queued When
6351+
pid:
6352+
anyOf:
6353+
- type: integer
6354+
- type: 'null'
6355+
title: Pid
6356+
executor:
6357+
anyOf:
6358+
- type: string
6359+
- type: 'null'
6360+
title: Executor
6361+
executor_config:
6362+
type: string
6363+
title: Executor Config
6364+
type: object
6365+
required:
6366+
- task_id
6367+
- dag_id
6368+
- dag_run_id
6369+
- map_index
6370+
- start_date
6371+
- end_date
6372+
- duration
6373+
- state
6374+
- try_number
6375+
- max_tries
6376+
- task_display_name
6377+
- hostname
6378+
- unixname
6379+
- pool
6380+
- pool_slots
6381+
- queue
6382+
- priority_weight
6383+
- operator
6384+
- queued_when
6385+
- pid
6386+
- executor
6387+
- executor_config
6388+
title: TaskInstanceHistoryResponse
6389+
description: TaskInstanceHistory serializer for responses.
62006390
TaskInstanceResponse:
62016391
properties:
62026392
id:

airflow/api_fastapi/core_api/routes/public/task_instances.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,15 @@
5050
from airflow.api_fastapi.core_api.datamodels.task_instances import (
5151
TaskDependencyCollectionResponse,
5252
TaskInstanceCollectionResponse,
53+
TaskInstanceHistoryResponse,
5354
TaskInstanceResponse,
5455
TaskInstancesBatchBody,
5556
)
5657
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
5758
from airflow.exceptions import TaskNotFound
59+
from airflow.models import Base
5860
from airflow.models.taskinstance import TaskInstance as TI
61+
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
5962
from airflow.ti_deps.dep_context import DepContext
6063
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
6164
from airflow.utils.db import get_query_count
@@ -412,3 +415,38 @@ def get_task_instances_batch(
412415
],
413416
total_entries=total_entries,
414417
)
418+
419+
420+
@task_instances_router.get(
421+
"/{task_id}/tries/{task_try_number}",
422+
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
423+
)
424+
def get_task_instance_try_details(
425+
dag_id: str,
426+
dag_run_id: str,
427+
task_id: str,
428+
task_try_number: int,
429+
session: Annotated[Session, Depends(get_session)],
430+
map_index: int = -1,
431+
) -> TaskInstanceHistoryResponse:
432+
"""Get task instance details by try number."""
433+
434+
def _query(orm_object: Base) -> TI | TIH | None:
435+
query = select(orm_object).where(
436+
orm_object.dag_id == dag_id,
437+
orm_object.run_id == dag_run_id,
438+
orm_object.task_id == task_id,
439+
orm_object.try_number == task_try_number,
440+
orm_object.map_index == map_index,
441+
)
442+
443+
task_instance = session.scalar(query)
444+
return task_instance
445+
446+
result = _query(TI) or _query(TIH)
447+
if result is None:
448+
raise HTTPException(
449+
status.HTTP_404_NOT_FOUND,
450+
f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}`, try_number: `{task_try_number}` and map_index: `{map_index}` was not found",
451+
)
452+
return TaskInstanceHistoryResponse.model_validate(result, from_attributes=True)

airflow/ui/openapi-gen/queries/common.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,6 +1001,33 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = (
10011001
},
10021002
]),
10031003
];
1004+
export type TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse =
1005+
Awaited<ReturnType<typeof TaskInstanceService.getTaskInstanceTryDetails>>;
1006+
export type TaskInstanceServiceGetTaskInstanceTryDetailsQueryResult<
1007+
TData = TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse,
1008+
TError = unknown,
1009+
> = UseQueryResult<TData, TError>;
1010+
export const useTaskInstanceServiceGetTaskInstanceTryDetailsKey =
1011+
"TaskInstanceServiceGetTaskInstanceTryDetails";
1012+
export const UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn = (
1013+
{
1014+
dagId,
1015+
dagRunId,
1016+
mapIndex,
1017+
taskId,
1018+
taskTryNumber,
1019+
}: {
1020+
dagId: string;
1021+
dagRunId: string;
1022+
mapIndex?: number;
1023+
taskId: string;
1024+
taskTryNumber: number;
1025+
},
1026+
queryKey?: Array<unknown>,
1027+
) => [
1028+
useTaskInstanceServiceGetTaskInstanceTryDetailsKey,
1029+
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]),
1030+
];
10041031
export type TaskServiceGetTasksDefaultResponse = Awaited<
10051032
ReturnType<typeof TaskService.getTasks>
10061033
>;

airflow/ui/openapi-gen/queries/prefetch.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1351,6 +1351,51 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (
13511351
updatedAtLte,
13521352
}),
13531353
});
1354+
/**
1355+
* Get Task Instance Try Details
1356+
* Get task instance details by try number.
1357+
* @param data The data for the request.
1358+
* @param data.dagId
1359+
* @param data.dagRunId
1360+
* @param data.taskId
1361+
* @param data.taskTryNumber
1362+
* @param data.mapIndex
1363+
* @returns TaskInstanceHistoryResponse Successful Response
1364+
* @throws ApiError
1365+
*/
1366+
export const prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = (
1367+
queryClient: QueryClient,
1368+
{
1369+
dagId,
1370+
dagRunId,
1371+
mapIndex,
1372+
taskId,
1373+
taskTryNumber,
1374+
}: {
1375+
dagId: string;
1376+
dagRunId: string;
1377+
mapIndex?: number;
1378+
taskId: string;
1379+
taskTryNumber: number;
1380+
},
1381+
) =>
1382+
queryClient.prefetchQuery({
1383+
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn({
1384+
dagId,
1385+
dagRunId,
1386+
mapIndex,
1387+
taskId,
1388+
taskTryNumber,
1389+
}),
1390+
queryFn: () =>
1391+
TaskInstanceService.getTaskInstanceTryDetails({
1392+
dagId,
1393+
dagRunId,
1394+
mapIndex,
1395+
taskId,
1396+
taskTryNumber,
1397+
}),
1398+
});
13541399
/**
13551400
* Get Tasks
13561401
* Get tasks for DAG.

0 commit comments

Comments
 (0)