Skip to content

Commit

Permalink
Merge branch 'main' into add-deferrable-mode-to-google-cloud_storage_…
Browse files Browse the repository at this point in the history
…transofer
  • Loading branch information
tnk-ysk authored Jan 20, 2025
2 parents d235c1d + ee785a8 commit 36611d1
Show file tree
Hide file tree
Showing 158 changed files with 3,129 additions and 843 deletions.
1 change: 1 addition & 0 deletions .cherry_picker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ repo = "airflow"
fix_commit_msg = false
default_branch = "main"
require_version_in_branch_name=false
draft_pr = true
2 changes: 1 addition & 1 deletion .github/actions/install-pre-commit/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ inputs:
default: "3.9"
uv-version:
description: 'uv version to use'
default: "0.5.14" # Keep this comment to allow automatic replacement of uv version
default: "0.5.20" # Keep this comment to allow automatic replacement of uv version
pre-commit-version:
description: 'pre-commit version to use'
default: "4.0.1" # Keep this comment to allow automatic replacement of pre-commit version
Expand Down
4 changes: 1 addition & 3 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ labelPRBasedOnFilePath:
- providers/tests/system/docker/**/*

provider:edge:
- providers/src/airflow/providers/edge/**/*
- docs/apache-airflow-providers-edge/**/*
- providers/tests/edge/**/*
- providers/edge/**

provider:elasticsearch:
- providers/src/airflow/providers/elasticsearch/**/*
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/backport-cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
python -m pip install cherry-picker==2.4.0 requests==2.32.3
python -m pip install cherry-picker==2.5.0 requests==2.32.3
- name: Run backport script
id: execute-backport
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ jobs:
- name: "Cleanup docker"
run: ./scripts/ci/cleanup_docker.sh
- name: Setup pnpm
uses: pnpm/[email protected]
uses: pnpm/action-setup@fe02b34f77f8bc703788d5817da081398fad5dd2 # v4.0.0
with:
version: 9
run_install: false
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.14
ARG AIRFLOW_UV_VERSION=0.5.20
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ COPY --from=scripts common.sh install_packaging_tools.sh install_additional_depe
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.14
ARG AIRFLOW_UV_VERSION=0.5.20
# TODO(potiuk): automate with upgrade check (possibly)
ARG AIRFLOW_PRE_COMMIT_VERSION="4.0.1"
ARG AIRFLOW_PRE_COMMIT_UV_VERSION="4.1.4"
Expand Down
92 changes: 0 additions & 92 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5989,68 +5989,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/variables/import:
post:
tags:
- Variable
summary: Import Variables
description: Import variables from a JSON file.
operationId: import_variables
parameters:
- name: action_if_exists
in: query
required: false
schema:
enum:
- overwrite
- fail
- skip
type: string
default: fail
title: Action If Exists
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/Body_import_variables'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/VariablesImportResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}:
get:
tags:
Expand Down Expand Up @@ -6629,16 +6567,6 @@ components:
- status
title: BaseInfoResponse
description: Base info serializer for responses.
Body_import_variables:
properties:
file:
type: string
format: binary
title: File
type: object
required:
- file
title: Body_import_variables
ClearTaskInstancesBody:
properties:
dry_run:
Expand Down Expand Up @@ -10066,26 +9994,6 @@ components:
- is_encrypted
title: VariableResponse
description: Variable serializer for responses.
VariablesImportResponse:
properties:
created_variable_keys:
items:
type: string
type: array
title: Created Variable Keys
import_count:
type: integer
title: Import Count
created_count:
type: integer
title: Created Count
type: object
required:
- created_variable_keys
- import_count
- created_count
title: VariablesImportResponse
description: Import Variables serializer for responses.
VersionInfo:
properties:
version:
Expand Down
59 changes: 2 additions & 57 deletions airflow/api_fastapi/core_api/routes/public/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
# under the License.
from __future__ import annotations

import json
from typing import Annotated, Literal
from typing import Annotated

from fastapi import Depends, HTTPException, Query, UploadFile, status
from fastapi import Depends, HTTPException, Query, status
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import select
Expand All @@ -39,7 +38,6 @@
VariableBulkResponse,
VariableCollectionResponse,
VariableResponse,
VariablesImportResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.services.public.variables import (
Expand Down Expand Up @@ -192,59 +190,6 @@ def post_variable(
return variable


@variables_router.post(
"/import",
status_code=status.HTTP_200_OK,
responses=create_openapi_http_exception_doc(
[status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT, status.HTTP_422_UNPROCESSABLE_ENTITY]
),
)
def import_variables(
file: UploadFile,
session: SessionDep,
action_if_exists: Literal["overwrite", "fail", "skip"] = "fail",
) -> VariablesImportResponse:
"""Import variables from a JSON file."""
try:
file_content = file.file.read().decode("utf-8")
variables = json.loads(file_content)

if not isinstance(variables, dict):
raise ValueError("Uploaded JSON must contain key-value pairs.")
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON format: {e}")

if not variables:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="No variables found in the provided JSON.",
)

existing_keys = {variable for variable in session.execute(select(Variable.key)).scalars()}
import_keys = set(variables.keys())

matched_keys = existing_keys & import_keys

if action_if_exists == "fail" and matched_keys:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"The variables with these keys: {matched_keys} already exists.",
)
elif action_if_exists == "skip":
create_keys = import_keys - matched_keys
else:
create_keys = import_keys

for key in create_keys:
Variable.set(key=key, value=variables[key], session=session)

return VariablesImportResponse(
created_count=len(create_keys),
import_count=len(import_keys),
created_variable_keys=list(create_keys),
)


@variables_router.patch("")
def bulk_variables(
request: VariableBulkBody,
Expand Down
36 changes: 36 additions & 0 deletions airflow/api_fastapi/execution_api/datamodels/asset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from airflow.api_fastapi.core_api.base import BaseModel


class AssetResponse(BaseModel):
"""Asset schema for responses with fields that are needed for Runtime."""

name: str
uri: str
group: str
extra: dict | None = None


class AssetAliasResponse(BaseModel):
"""Asset alias schema with fields that are needed for Runtime."""

name: str
group: str
10 changes: 9 additions & 1 deletion airflow/api_fastapi/execution_api/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@
from __future__ import annotations

from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.execution_api.routes import connections, health, task_instances, variables, xcoms
from airflow.api_fastapi.execution_api.routes import (
assets,
connections,
health,
task_instances,
variables,
xcoms,
)

execution_api_router = AirflowRouter()
execution_api_router.include_router(assets.router, prefix="/assets", tags=["Assets"])
execution_api_router.include_router(connections.router, prefix="/connections", tags=["Connections"])
execution_api_router.include_router(health.router, tags=["Health"])
execution_api_router.include_router(task_instances.router, prefix="/task-instances", tags=["Task Instances"])
Expand Down
71 changes: 71 additions & 0 deletions airflow/api_fastapi/execution_api/routes/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import Annotated

from fastapi import HTTPException, Query, status
from sqlalchemy import select

from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.execution_api.datamodels.asset import AssetResponse
from airflow.models.asset import AssetModel

# TODO: Add dependency on JWT token
router = AirflowRouter(
responses={
status.HTTP_404_NOT_FOUND: {"description": "Asset not found"},
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
},
)


@router.get("/by-name")
def get_asset_by_name(
name: Annotated[str, Query(description="The name of the Asset")],
session: SessionDep,
) -> AssetResponse:
"""Get an Airflow Asset by `name`."""
asset = session.scalar(select(AssetModel).where(AssetModel.name == name, AssetModel.active.has()))
_raise_if_not_found(asset, f"Asset with name {name} not found")

return AssetResponse.model_validate(asset)


@router.get("/by-uri")
def get_asset_by_uri(
uri: Annotated[str, Query(description="The URI of the Asset")],
session: SessionDep,
) -> AssetResponse:
"""Get an Airflow Asset by `uri`."""
asset = session.scalar(select(AssetModel).where(AssetModel.uri == uri, AssetModel.active.has()))
_raise_if_not_found(asset, f"Asset with URI {uri} not found")

return AssetResponse.model_validate(asset)


def _raise_if_not_found(asset, msg):
if asset is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
detail={
"reason": "not_found",
"message": msg,
},
)
4 changes: 4 additions & 0 deletions airflow/cli/commands/remote_commands/config_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ def message(self) -> str:
config=ConfigParameter("scheduler", "statsd_custom_client_path"),
renamed_to=ConfigParameter("metrics", "statsd_custom_client_path"),
),
ConfigChange(
config=ConfigParameter("scheduler", "dag_dir_list_interval"),
renamed_to=ConfigParameter("dag_bundles", "refresh_interval"),
),
# celery
ConfigChange(
config=ConfigParameter("celery", "stalled_task_timeout"),
Expand Down
Loading

0 comments on commit 36611d1

Please sign in to comment.