Skip to content

Commit

Permalink
AIP-81 Add Overwrite for Bulk Insert Pool API (#45397)
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 authored Jan 5, 2025
1 parent 8a3d0f4 commit 8d37497
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 76 deletions.
1 change: 1 addition & 0 deletions airflow/api_fastapi/core_api/datamodels/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@ class PoolPostBulkBody(BaseModel):
"""Pools serializer for post bodies."""

pools: list[PoolPostBody]
overwrite: bool | None = Field(default=False)
20 changes: 16 additions & 4 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3999,12 +3999,12 @@ paths:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/pools/bulk:
post:
put:
tags:
- Pool
summary: Post Pools
summary: Put Pools
description: Create multiple pools.
operationId: post_pools
operationId: put_pools
requestBody:
content:
application/json:
Expand All @@ -4013,7 +4013,7 @@ paths:
required: true
responses:
'201':
description: Successful Response
description: Created
content:
application/json:
schema:
Expand All @@ -4036,6 +4036,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
'200':
description: Created with overwriting
content:
application/json:
schema:
$ref: '#/components/schemas/PoolCollectionResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -8645,6 +8651,12 @@ components:
$ref: '#/components/schemas/PoolPostBody'
type: array
title: Pools
overwrite:
anyOf:
- type: boolean
- type: 'null'
title: Overwrite
default: false
type: object
required:
- pools
Expand Down
48 changes: 38 additions & 10 deletions airflow/api_fastapi/core_api/routes/public/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from typing import Annotated, cast

from fastapi import Depends, HTTPException, Query, status
from fastapi import Depends, HTTPException, Query, Response, status
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import delete, select
Expand Down Expand Up @@ -176,21 +176,49 @@ def post_pool(
return pool


@pools_router.post(
@pools_router.put(
"/bulk",
status_code=status.HTTP_201_CREATED,
responses=create_openapi_http_exception_doc(
[
status.HTTP_409_CONFLICT, # handled by global exception handler
]
),
responses={
**create_openapi_http_exception_doc(
[
status.HTTP_409_CONFLICT, # handled by global exception handler
]
),
status.HTTP_201_CREATED: {
"description": "Created",
"model": PoolCollectionResponse,
},
status.HTTP_200_OK: {
"description": "Created with overwriting",
"model": PoolCollectionResponse,
},
},
)
def post_pools(
body: PoolPostBulkBody,
def put_pools(
response: Response,
put_body: PoolPostBulkBody,
session: SessionDep,
) -> PoolCollectionResponse:
"""Create multiple pools."""
pools = [Pool(**body.model_dump()) for body in body.pools]
response.status_code = status.HTTP_201_CREATED if not put_body.overwrite else status.HTTP_200_OK
pools: list[Pool]
if not put_body.overwrite:
pools = [Pool(**body.model_dump()) for body in put_body.pools]
else:
pool_names = [pool.pool for pool in put_body.pools]
existed_pools = session.execute(select(Pool).filter(Pool.pool.in_(pool_names))).scalars()
existed_pools_dict = {pool.pool: pool for pool in existed_pools}
pools = []
# if pool already exists, update the corresponding pool, else add a new pool
for body in put_body.pools:
if body.pool in existed_pools_dict:
pool = existed_pools_dict[body.pool]
for key, val in body.model_dump().items():
setattr(pool, key, val)
pools.append(pool)
else:
pools.append(Pool(**body.model_dump()))
session.add_all(pools)
return PoolCollectionResponse(
pools=cast(list[PoolResponse], pools),
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1616,7 +1616,6 @@ export type TaskInstanceServicePostClearTaskInstancesMutationResult = Awaited<
ReturnType<typeof TaskInstanceService.postClearTaskInstances>
>;
export type PoolServicePostPoolMutationResult = Awaited<ReturnType<typeof PoolService.postPool>>;
export type PoolServicePostPoolsMutationResult = Awaited<ReturnType<typeof PoolService.postPools>>;
export type VariableServicePostVariableMutationResult = Awaited<
ReturnType<typeof VariableService.postVariable>
>;
Expand All @@ -1635,6 +1634,7 @@ export type BackfillServiceCancelBackfillMutationResult = Awaited<
export type ConnectionServicePutConnectionsMutationResult = Awaited<
ReturnType<typeof ConnectionService.putConnections>
>;
export type PoolServicePutPoolsMutationResult = Awaited<ReturnType<typeof PoolService.putPools>>;
export type DagParsingServiceReparseDagFileMutationResult = Awaited<
ReturnType<typeof DagParsingService.reparseDagFile>
>;
Expand Down
73 changes: 37 additions & 36 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3033,42 +3033,6 @@ export const usePoolServicePostPool = <
mutationFn: ({ requestBody }) => PoolService.postPool({ requestBody }) as unknown as Promise<TData>,
...options,
});
/**
* Post Pools
* Create multiple pools.
* @param data The data for the request.
* @param data.requestBody
* @returns PoolCollectionResponse Successful Response
* @throws ApiError
*/
export const usePoolServicePostPools = <
TData = Common.PoolServicePostPoolsMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
requestBody: PoolPostBulkBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
requestBody: PoolPostBulkBody;
},
TContext
>({
mutationFn: ({ requestBody }) => PoolService.postPools({ requestBody }) as unknown as Promise<TData>,
...options,
});
/**
* Post Variable
* Create a variable.
Expand Down Expand Up @@ -3292,6 +3256,43 @@ export const useConnectionServicePutConnections = <
ConnectionService.putConnections({ requestBody }) as unknown as Promise<TData>,
...options,
});
/**
* Put Pools
* Create multiple pools.
* @param data The data for the request.
* @param data.requestBody
* @returns PoolCollectionResponse Created with overwriting
* @returns PoolCollectionResponse Created
* @throws ApiError
*/
export const usePoolServicePutPools = <
TData = Common.PoolServicePutPoolsMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
requestBody: PoolPostBulkBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
requestBody: PoolPostBulkBody;
},
TContext
>({
mutationFn: ({ requestBody }) => PoolService.putPools({ requestBody }) as unknown as Promise<TData>,
...options,
});
/**
* Reparse Dag File
* Request re-parsing a DAG file.
Expand Down
12 changes: 12 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3767,6 +3767,18 @@ export const $PoolPostBulkBody = {
type: "array",
title: "Pools",
},
overwrite: {
anyOf: [
{
type: "boolean",
},
{
type: "null",
},
],
title: "Overwrite",
default: false,
},
},
type: "object",
required: ["pools"],
Expand Down
13 changes: 7 additions & 6 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ import type {
GetPoolsResponse,
PostPoolData,
PostPoolResponse,
PostPoolsData,
PostPoolsResponse,
PutPoolsData,
PutPoolsResponse,
GetProvidersData,
GetProvidersResponse,
GetXcomEntryData,
Expand Down Expand Up @@ -2701,16 +2701,17 @@ export class PoolService {
}

/**
* Post Pools
* Put Pools
* Create multiple pools.
* @param data The data for the request.
* @param data.requestBody
* @returns PoolCollectionResponse Successful Response
* @returns PoolCollectionResponse Created with overwriting
* @returns PoolCollectionResponse Created
* @throws ApiError
*/
public static postPools(data: PostPoolsData): CancelablePromise<PostPoolsResponse> {
public static putPools(data: PutPoolsData): CancelablePromise<PutPoolsResponse> {
return __request(OpenAPI, {
method: "POST",
method: "PUT",
url: "/public/pools/bulk",
body: data.requestBody,
mediaType: "application/json",
Expand Down
15 changes: 10 additions & 5 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,7 @@ export type PoolPostBody = {
*/
export type PoolPostBulkBody = {
pools: Array<PoolPostBody>;
overwrite?: boolean | null;
};

/**
Expand Down Expand Up @@ -2068,11 +2069,11 @@ export type PostPoolData = {

export type PostPoolResponse = PoolResponse;

export type PostPoolsData = {
export type PutPoolsData = {
requestBody: PoolPostBulkBody;
};

export type PostPoolsResponse = PoolCollectionResponse;
export type PutPoolsResponse = PoolCollectionResponse;

export type GetProvidersData = {
limit?: number;
Expand Down Expand Up @@ -4277,11 +4278,15 @@ export type $OpenApiTs = {
};
};
"/public/pools/bulk": {
post: {
req: PostPoolsData;
put: {
req: PutPoolsData;
res: {
/**
* Successful Response
* Created with overwriting
*/
200: PoolCollectionResponse;
/**
* Created
*/
201: PoolCollectionResponse;
/**
Expand Down
Loading

0 comments on commit 8d37497

Please sign in to comment.