Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove AIP-44 configuration from the code #44454

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions airflow/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from __future__ import annotations

import os
from argparse import Namespace

import argcomplete

Expand All @@ -36,8 +35,7 @@
# any possible import cycles with settings downstream.
from airflow import configuration
from airflow.cli import cli_parser
from airflow.configuration import AirflowConfigParser, write_webserver_configuration_if_needed
from airflow.exceptions import AirflowException
from airflow.configuration import write_webserver_configuration_if_needed


def main():
Expand All @@ -57,34 +55,8 @@ def main():
conf = write_default_airflow_configuration_if_needed()
if args.subcommand in ["webserver", "internal-api", "worker"]:
write_webserver_configuration_if_needed(conf)
configure_internal_api(args, conf)

args.func(args)


def configure_internal_api(args: Namespace, conf: AirflowConfigParser):
if conf.getboolean("core", "database_access_isolation", fallback=False):
if args.subcommand in ["worker", "dag-processor", "triggerer", "run"]:
# Untrusted components
if "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in os.environ:
# make sure that the DB is not available for the components that should not access it
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "none://"
conf.set("database", "sql_alchemy_conn", "none://")
from airflow.api_internal.internal_api_call import InternalApiConfig

InternalApiConfig.set_use_internal_api(args.subcommand)
else:
# Trusted components (this setting is mostly for Breeze where db_isolation and DB are both set
db_connection_url = conf.get("database", "sql_alchemy_conn")
if not db_connection_url or db_connection_url == "none://":
raise AirflowException(
f"Running trusted components {args.subcommand} in db isolation mode "
f"requires connection to be configured via database/sql_alchemy_conn."
)
from airflow.api_internal.internal_api_call import InternalApiConfig

InternalApiConfig.set_use_database_access(args.subcommand)


if __name__ == "__main__":
main()
150 changes: 2 additions & 148 deletions airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,13 @@

from __future__ import annotations

import inspect
import json
import logging
from functools import wraps
from http import HTTPStatus
from typing import Callable, TypeVar
from urllib.parse import urlparse

import requests
import tenacity
from urllib3.exceptions import NewConnectionError

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.settings import _ENABLE_AIP_44, force_traceback_session_for_untrusted_components
from airflow.exceptions import AirflowException
from airflow.typing_compat import ParamSpec
from airflow.utils.jwt_signer import JWTSigner

PS = ParamSpec("PS")
RT = TypeVar("RT")
Expand All @@ -49,145 +39,9 @@ def __init__(self, message: str, status_code: HTTPStatus):
self.status_code = status_code


class InternalApiConfig:
"""Stores and caches configuration for Internal API."""

_use_internal_api = False
_internal_api_endpoint = ""

@staticmethod
def set_use_database_access(component: str):
"""
Block current component from using Internal API.

All methods decorated with internal_api_call will always be executed locally.`
This mode is needed for "trusted" components like Scheduler, Webserver, Internal Api server
"""
InternalApiConfig._use_internal_api = False
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
logger.info(
"DB isolation mode. But this is a trusted component and DB connection is set. "
"Using database direct access when running %s.",
component,
)

@staticmethod
def set_use_internal_api(component: str, allow_tests_to_use_db: bool = False):
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
internal_api_url = conf.get("core", "internal_api_url")
url_conf = urlparse(internal_api_url)
api_path = url_conf.path
if api_path in ["", "/"]:
# Add the default path if not given in the configuration
api_path = "/internal_api/v1/rpcapi"
if url_conf.scheme not in ["http", "https"]:
raise AirflowConfigException("[core]internal_api_url must start with http:// or https://")
internal_api_endpoint = f"{url_conf.scheme}://{url_conf.netloc}{api_path}"
InternalApiConfig._use_internal_api = True
InternalApiConfig._internal_api_endpoint = internal_api_endpoint
logger.info("DB isolation mode. Using internal_api when running %s.", component)
force_traceback_session_for_untrusted_components(allow_tests_to_use_db=allow_tests_to_use_db)

@staticmethod
def get_use_internal_api():
return InternalApiConfig._use_internal_api

@staticmethod
def get_internal_api_endpoint():
return InternalApiConfig._internal_api_endpoint


def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]:
"""
Allow methods to be executed in database isolation mode.

If [core]database_access_isolation is true then such method are not executed locally,
but instead RPC call is made to Database API (aka Internal API). This makes some components
decouple from direct Airflow database access.
Each decorated method must be present in METHODS list in airflow.api_internal.endpoints.rpc_api_endpoint.
Only static methods can be decorated. This decorator must be before "provide_session".

See [AIP-44](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API)
for more information .
"""
from requests.exceptions import ConnectionError

def _is_retryable_exception(exception: BaseException) -> bool:
"""
Evaluate which exception types to retry.

This is especially demanded for cases where an application gateway or Kubernetes ingress can
not find a running instance of a webserver hosting the API (HTTP 502+504) or when the
HTTP request fails in general on network level.

Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop.
"""
retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT)
return (
isinstance(exception, AirflowHttpException)
and exception.status_code in retryable_status_codes
or isinstance(exception, (ConnectionError, NewConnectionError))
)

@tenacity.retry(
stop=tenacity.stop_after_attempt(10),
wait=tenacity.wait_exponential(min=1),
retry=tenacity.retry_if_exception(_is_retryable_exception),
before_sleep=tenacity.before_log(logger, logging.WARNING),
)
def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
signer = JWTSigner(
secret_key=conf.get("core", "internal_api_secret_key"),
expiration_time_in_seconds=conf.getint("core", "internal_api_clock_grace", fallback=30),
audience="api",
)
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": signer.generate_signed_token({"method": method_name}),
}
data = {"jsonrpc": "2.0", "method": method_name, "params": params_json}
internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint()
response = requests.post(url=internal_api_endpoint, data=json.dumps(data), headers=headers)
if response.status_code != 200:
raise AirflowHttpException(
f"Got {response.status_code}:{response.reason} when sending "
f"the internal api request: {response.text}",
HTTPStatus(response.status_code),
)
return response.content

@wraps(func)
def wrapper(*args, **kwargs):
use_internal_api = InternalApiConfig.get_use_internal_api()
if not use_internal_api:
return func(*args, **kwargs)
import traceback

tb = traceback.extract_stack()
if any(filename.endswith("conftest.py") for filename, _, _, _ in tb):
# This is a test fixture, we should not use internal API for it
return func(*args, **kwargs)

from airflow.serialization.serialized_objects import BaseSerialization # avoid circular import

bound = inspect.signature(func).bind(*args, **kwargs)
arguments_dict = dict(bound.arguments)
if "session" in arguments_dict:
del arguments_dict["session"]
if "cls" in arguments_dict: # used by @classmethod
del arguments_dict["cls"]

args_dict = BaseSerialization.serialize(arguments_dict, use_pydantic_models=True)
method_name = f"{func.__module__}.{func.__qualname__}"
result = make_jsonrpc_request(method_name, args_dict)
if result is None or result == b"":
return None
result = BaseSerialization.deserialize(json.loads(result), use_pydantic_models=True)
if isinstance(result, (KeyError, AttributeError, AirflowException)):
raise result
return result
return func(*args, **kwargs)

return wrapper
50 changes: 24 additions & 26 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from airflow import settings
from airflow.cli.commands.legacy_commands import check_legacy_command
from airflow.configuration import conf
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.cli import ColorMode
from airflow.utils.module_loading import import_string
from airflow.utils.state import DagRunState, JobState
Expand Down Expand Up @@ -2071,32 +2070,31 @@ class GroupCommand(NamedTuple):
),
]

if _ENABLE_AIP_44:
core_commands.append(
ActionCommand(
name="internal-api",
help="Start an Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
core_commands.append(
ActionCommand(
name="internal-api",
help="Start an Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
)
),
)


def _remove_dag_id_opt(command: ActionCommand):
Expand Down
4 changes: 0 additions & 4 deletions airflow/cli/commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from datetime import timedelta
from typing import Any

from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.dag_processing.manager import DagFileProcessorManager, reload_configuration_for_dag_processing
Expand All @@ -38,9 +37,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
"""Create DagFileProcessorProcess instance."""
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)
if InternalApiConfig.get_use_internal_api():
from airflow.models.renderedtifields import RenderedTaskInstanceFields # noqa: F401
from airflow.models.trigger import Trigger # noqa: F401
return DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
Expand Down
3 changes: 0 additions & 3 deletions airflow/cli/commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from tenacity import Retrying, stop_after_attempt, wait_fixed

from airflow import settings
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.exceptions import AirflowException
from airflow.utils import cli as cli_utils, db
from airflow.utils.db import _REVISION_HEADS_MAP
Expand Down Expand Up @@ -281,8 +280,6 @@ def shell(args):
@providers_configuration_loaded
def check(args):
"""Run a check command that checks if db is available."""
if InternalApiConfig.get_use_internal_api():
return
retries: int = args.retry
retry_delay: int = args.retry_delay

Expand Down
8 changes: 0 additions & 8 deletions airflow/cli/commands/internal_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from sqlalchemy.engine.url import make_url

from airflow import settings
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.cli.commands.webserver_command import GunicornMonitor
from airflow.configuration import conf
Expand Down Expand Up @@ -222,13 +221,6 @@ def create_app(config=None, testing=False):
if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config:
flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args()

if conf.getboolean("core", "database_access_isolation", fallback=False):
InternalApiConfig.set_use_database_access("Gunicorn worker initialization")
else:
raise AirflowConfigException(
"The internal-api component should only be run when database_access_isolation is enabled."
)

csrf = CSRFProtect()
csrf.init_app(flask_app)

Expand Down
13 changes: 1 addition & 12 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from sqlalchemy import select

from airflow import settings
from airflow.api_internal.internal_api_call import InternalApiConfig, internal_api_call
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagRunNotFound, TaskDeferred, TaskInstanceNotFound
Expand Down Expand Up @@ -333,9 +333,6 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None:

def _run_task_by_local_task_job(args, ti: TaskInstance | TaskInstancePydantic) -> TaskReturnCode | None:
"""Run LocalTaskJob, which monitors the raw task execution process."""
if InternalApiConfig.get_use_internal_api():
from airflow.models.renderedtifields import RenderedTaskInstanceFields # noqa: F401
from airflow.models.trigger import Trigger # noqa: F401
job_runner = LocalTaskJobRunner(
job=Job(dag_id=ti.dag_id),
task_instance=ti,
Expand Down Expand Up @@ -490,14 +487,6 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:

log.info("Running %s on host %s", ti, hostname)

if not InternalApiConfig.get_use_internal_api():
# IMPORTANT, have to re-configure ORM with the NullPool, otherwise, each "run" command may leave
# behind multiple open sleeping connections while heartbeating, which could
# easily exceed the database connection limit when
# processing hundreds of simultaneous tasks.
# this should be last thing before running, to reduce likelihood of an open session
# which can cause trouble if running process in a fork.
settings.reconfigure_orm(disable_connection_pool=True)
task_return_code = None
try:
if args.interactive:
Expand Down
4 changes: 0 additions & 4 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import os
from typing import TYPE_CHECKING

from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.exceptions import AirflowConfigException, UnknownExecutorException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
Expand Down Expand Up @@ -293,9 +292,6 @@ def validate_database_executor_compatibility(cls, executor: type[BaseExecutor])
if os.environ.get("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") == "1":
return

if InternalApiConfig.get_use_internal_api():
return

from airflow.settings import engine

# SQLite only works with single threaded executors
Expand Down
Loading
Loading