From 6faa720cd8f206a5af718752d27c1dd5d38c96d1 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 18 Nov 2024 17:44:29 +0000 Subject: [PATCH] Remove XCom pickling (#43905) XCom pickling was disabled by default in Airflow 2.0.0: https://airflow.apache.org/docs/apache-airflow/1.10.15/configurations-ref.html#enable-xcom-pickling --- .../api_connexion/endpoints/xcom_endpoint.py | 2 +- .../core_api/routes/public/xcom.py | 2 +- airflow/config_templates/config.yml | 9 --- airflow/models/taskinstance.py | 4 +- airflow/models/xcom.py | 28 +------ newsfragments/aip-72.significant.rst | 10 +++ .../microsoft/azure/operators/adx.py | 3 +- .../microsoft/winrm/operators/winrm.py | 3 +- .../airflow/providers/ssh/operators/ssh.py | 3 +- providers/tests/sftp/operators/test_sftp.py | 7 ++ .../endpoints/test_xcom_endpoint.py | 30 -------- .../api_connexion/schemas/test_xcom_schema.py | 51 ------------ .../core_api/routes/public/test_xcom.py | 21 ----- tests/models/test_xcom.py | 77 +------------------ 14 files changed, 31 insertions(+), 219 deletions(-) diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py index 1019d0199ec00..c86617391ab12 100644 --- a/airflow/api_connexion/endpoints/xcom_endpoint.py +++ b/airflow/api_connexion/endpoints/xcom_endpoint.py @@ -127,7 +127,7 @@ def get_xcom_entry( stub.value = XCom.deserialize_value(stub) item = stub - if stringify or conf.getboolean("core", "enable_xcom_pickling"): + if stringify: return xcom_schema_string.dump(item) return xcom_schema_native.dump(item) diff --git a/airflow/api_fastapi/core_api/routes/public/xcom.py b/airflow/api_fastapi/core_api/routes/public/xcom.py index ebd0545742c4c..ef13c927e8636 100644 --- a/airflow/api_fastapi/core_api/routes/public/xcom.py +++ b/airflow/api_fastapi/core_api/routes/public/xcom.py @@ -88,7 +88,7 @@ def get_xcom_entry( xcom_stub.value = XCom.deserialize_value(xcom_stub) item = xcom_stub - if stringify or conf.getboolean("core", "enable_xcom_pickling"): + if stringify: return XComResponseString.model_validate(item, from_attributes=True) return XComResponseNative.model_validate(item, from_attributes=True) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index eba9f7b8c70ed..04ac0d802a999 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -249,15 +249,6 @@ core: type: string example: ~ default: "False" - enable_xcom_pickling: - description: | - Whether to enable pickling for xcom (note that this is insecure and allows for - RCE exploits). - version_added: ~ - type: string - example: ~ - default: "False" - see_also: "https://docs.python.org/3/library/pickle.html#comparison-with-json" allowed_deserialization_classes: description: | What classes can be imported during deserialization. This is a multi line value. diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 521077ea761e5..dd0bf3916a4f2 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -3368,9 +3368,7 @@ def xcom_push( Make an XCom available for tasks to pull. :param key: Key to store the value under. - :param value: Value to store. What types are possible depends on whether - ``enable_xcom_pickling`` is true or not. If so, this can be any - picklable object; only be JSON-serializable may be used otherwise. + :param value: Value to store. Only be JSON-serializable may be used otherwise. """ XCom.set( key=key, diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index 69843db5ab5e6..f8b0fac330e08 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -20,7 +20,6 @@ import inspect import json import logging -import pickle from typing import TYPE_CHECKING, Any, Iterable, cast from sqlalchemy import ( @@ -455,21 +454,8 @@ def serialize_value( run_id: str | None = None, map_index: int | None = None, ) -> Any: - """Serialize XCom value to str or pickled object.""" - if conf.getboolean("core", "enable_xcom_pickling"): - return pickle.dumps(value) - try: - return json.dumps(value, cls=XComEncoder).encode("UTF-8") - except (ValueError, TypeError) as ex: - log.error( - "%s." - " If you are using pickle instead of JSON for XCom," - " then you need to enable pickle support for XCom" - " in your airflow config or make sure to decorate your" - " object with attr.", - ex, - ) - raise + """Serialize XCom value to JSON str.""" + return json.dumps(value, cls=XComEncoder).encode("UTF-8") @staticmethod def _deserialize_value(result: XCom, orm: bool) -> Any: @@ -479,14 +465,8 @@ def _deserialize_value(result: XCom, orm: bool) -> Any: if result.value is None: return None - if conf.getboolean("core", "enable_xcom_pickling"): - try: - return pickle.loads(result.value) - except pickle.UnpicklingError: - return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook) - else: - # Since xcom_pickling is disabled, we should only try to deserialize with JSON - return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook) + + return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook) @staticmethod def deserialize_value(result: XCom) -> Any: diff --git a/newsfragments/aip-72.significant.rst b/newsfragments/aip-72.significant.rst index 2baafad7ab8b9..80b533926c540 100644 --- a/newsfragments/aip-72.significant.rst +++ b/newsfragments/aip-72.significant.rst @@ -17,3 +17,13 @@ As part of this change the following breaking changes have occurred: - Shipping DAGs via pickle is no longer supported This was a feature that was not widely used and was a security risk. It has been removed. + +- Pickling is no longer supported for XCom serialization. + + XCom data will no longer support pickling. This change is intended to improve security and simplify data + handling by supporting JSON-only serialization. DAGs that depend on XCom pickling must update to use JSON-serializable data. + + As part of that change, ``[core] enable_xcom_pickling`` configuration option has been removed. + + If you still need to use pickling, you can use a custom XCom backend that stores references in the metadata DB and + the pickled data can be stored in a separate storage like S3. diff --git a/providers/src/airflow/providers/microsoft/azure/operators/adx.py b/providers/src/airflow/providers/microsoft/azure/operators/adx.py index 1621fde3489eb..1a8c1a07f018d 100644 --- a/providers/src/airflow/providers/microsoft/azure/operators/adx.py +++ b/providers/src/airflow/providers/microsoft/azure/operators/adx.py @@ -85,7 +85,8 @@ def execute(self, context: Context) -> KustoResultTable | str: https://docs.microsoft.com/en-us/azure/kusto/api/rest/response2 """ response = self.hook.run_query(self.query, self.database, self.options) - if conf.getboolean("core", "enable_xcom_pickling"): + # TODO: Remove this after minimum Airflow version is 3.0 + if conf.getboolean("core", "enable_xcom_pickling", fallback=False): return response.primary_results[0] else: return str(response.primary_results[0]) diff --git a/providers/src/airflow/providers/microsoft/winrm/operators/winrm.py b/providers/src/airflow/providers/microsoft/winrm/operators/winrm.py index 0662333c7886c..1e91cd1daf56f 100644 --- a/providers/src/airflow/providers/microsoft/winrm/operators/winrm.py +++ b/providers/src/airflow/providers/microsoft/winrm/operators/winrm.py @@ -97,7 +97,8 @@ def execute(self, context: Context) -> list | str: if return_code == 0: # returning output if do_xcom_push is set - enable_pickling = conf.getboolean("core", "enable_xcom_pickling") + # TODO: Remove this after minimum Airflow version is 3.0 + enable_pickling = conf.getboolean("core", "enable_xcom_pickling", fallback=False) if enable_pickling: return stdout_buffer diff --git a/providers/src/airflow/providers/ssh/operators/ssh.py b/providers/src/airflow/providers/ssh/operators/ssh.py index 9847614eafce5..b8dc412681107 100644 --- a/providers/src/airflow/providers/ssh/operators/ssh.py +++ b/providers/src/airflow/providers/ssh/operators/ssh.py @@ -188,7 +188,8 @@ def execute(self, context=None) -> bytes | str: with self.get_ssh_client() as ssh_client: result = self.run_ssh_client_command(ssh_client, self.command, context=context) - enable_pickling = conf.getboolean("core", "enable_xcom_pickling") + # TODO: Remove this after minimum Airflow version is 3.0 + enable_pickling = conf.getboolean("core", "enable_xcom_pickling", fallback=False) if not enable_pickling: result = b64encode(result).decode("utf-8") diff --git a/providers/tests/sftp/operators/test_sftp.py b/providers/tests/sftp/operators/test_sftp.py index 96d43145cc163..2bd4be3d269e6 100644 --- a/providers/tests/sftp/operators/test_sftp.py +++ b/providers/tests/sftp/operators/test_sftp.py @@ -36,6 +36,7 @@ from airflow.utils import timezone from airflow.utils.timezone import datetime +from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS from tests_common.test_utils.config import conf_vars pytestmark = pytest.mark.db_test @@ -95,6 +96,7 @@ def teardown_method(self): if os.path.exists(self.test_remote_dir): os.rmdir(self.test_remote_dir) + @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3") @conf_vars({("core", "enable_xcom_pickling"): "True"}) def test_pickle_file_transfer_put(self, dag_maker): test_local_file_content = ( @@ -129,6 +131,7 @@ def test_pickle_file_transfer_put(self, dag_maker): pulled = tis["check_file_task"].xcom_pull(task_ids="check_file_task", key="return_value") assert pulled.strip() == test_local_file_content + @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3") @conf_vars({("core", "enable_xcom_pickling"): "True"}) def test_file_transfer_no_intermediate_dir_error_put(self, create_task_instance_of_operator): test_local_file_content = ( @@ -158,6 +161,7 @@ def test_file_transfer_no_intermediate_dir_error_put(self, create_task_instance_ ti2.run() assert "No such file" in str(ctx.value) + @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3") @conf_vars({("core", "enable_xcom_pickling"): "True"}) def test_file_transfer_with_intermediate_dir_put(self, dag_maker): test_local_file_content = ( @@ -232,6 +236,7 @@ def create_remote_file_and_cleanup(self): yield os.remove(self.test_remote_filepath) + @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3") @conf_vars({("core", "enable_xcom_pickling"): "True"}) def test_pickle_file_transfer_get(self, dag_maker, create_remote_file_and_cleanup): with dag_maker(dag_id="unit_tests_sftp_op_pickle_file_transfer_get"): @@ -275,6 +280,7 @@ def test_json_file_transfer_get(self, dag_maker, create_remote_file_and_cleanup) content_received = file.read() assert content_received == self.test_remote_file_content + @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3") @conf_vars({("core", "enable_xcom_pickling"): "True"}) def test_file_transfer_no_intermediate_dir_error_get(self, dag_maker, create_remote_file_and_cleanup): with dag_maker(dag_id="unit_tests_sftp_op_file_transfer_no_intermediate_dir_error_get"): @@ -298,6 +304,7 @@ def test_file_transfer_no_intermediate_dir_error_get(self, dag_maker, create_rem ti.run() assert "No such file" in str(ctx.value) + @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3") @conf_vars({("core", "enable_xcom_pickling"): "True"}) def test_file_transfer_with_intermediate_dir_error_get(self, dag_maker, create_remote_file_and_cleanup): with dag_maker(dag_id="unit_tests_sftp_op_file_transfer_with_intermediate_dir_error_get"): diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py b/tests/api_connexion/endpoints/test_xcom_endpoint.py index aa767ae168b02..000d509fc9b80 100644 --- a/tests/api_connexion/endpoints/test_xcom_endpoint.py +++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py @@ -158,36 +158,6 @@ def test_should_respond_200_native(self): "value": {"key": "value"}, } - @conf_vars({("core", "enable_xcom_pickling"): "True"}) - def test_should_respond_200_native_for_pickled(self): - dag_id = "test-dag-id" - task_id = "test-task-id" - logical_date = "2005-04-02T00:00:00+00:00" - xcom_key = "test-xcom-key" - logical_date_parsed = timezone.parse(logical_date) - run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) - value_non_serializable_key = {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359} - self._create_xcom_entry( - dag_id, run_id, logical_date_parsed, task_id, xcom_key, {"key": value_non_serializable_key} - ) - response = self.client.get( - f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}", - environ_overrides={"REMOTE_USER": "test"}, - ) - assert 200 == response.status_code - - current_data = response.json - current_data["timestamp"] = "TIMESTAMP" - assert current_data == { - "dag_id": dag_id, - "logical_date": logical_date, - "key": xcom_key, - "task_id": task_id, - "map_index": -1, - "timestamp": "TIMESTAMP", - "value": f"{{'key': {str(value_non_serializable_key)}}}", - } - def test_should_raise_404_for_non_existent_xcom(self): dag_id = "test-dag-id" task_id = "test-task-id" diff --git a/tests/api_connexion/schemas/test_xcom_schema.py b/tests/api_connexion/schemas/test_xcom_schema.py index 24ce27efcf46f..fd978f3ac8fe3 100644 --- a/tests/api_connexion/schemas/test_xcom_schema.py +++ b/tests/api_connexion/schemas/test_xcom_schema.py @@ -16,8 +16,6 @@ # under the License. from __future__ import annotations -import pickle - import pytest from sqlalchemy import or_, select @@ -25,14 +23,11 @@ XComCollection, xcom_collection_item_schema, xcom_collection_schema, - xcom_schema_string, ) from airflow.models import DagRun, XCom from airflow.utils import timezone from airflow.utils.session import create_session -from tests_common.test_utils.config import conf_vars - pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -184,49 +179,3 @@ def test_serialize(self, create_xcom, session): "total_entries": 2, }, ) - - -class TestXComSchema: - default_time = "2016-04-02T21:00:00+00:00" - default_time_parsed = timezone.parse(default_time) - - @conf_vars({("core", "enable_xcom_pickling"): "True"}) - def test_serialize(self, create_xcom, session): - create_xcom( - dag_id="test_dag", - task_id="test_task_id", - logical_date=self.default_time_parsed, - key="test_key", - value=pickle.dumps(b"test_binary"), - ) - xcom_model = session.query(XCom).first() - deserialized_xcom = xcom_schema_string.dump(xcom_model) - assert deserialized_xcom == { - "key": "test_key", - "timestamp": self.default_time, - "logical_date": self.default_time, - "task_id": "test_task_id", - "dag_id": "test_dag", - "value": "test_binary", - "map_index": -1, - } - - @conf_vars({("core", "enable_xcom_pickling"): "True"}) - def test_deserialize(self): - xcom_dump = { - "key": "test_key", - "timestamp": self.default_time, - "logical_date": self.default_time, - "task_id": "test_task_id", - "dag_id": "test_dag", - "value": b"test_binary", - } - result = xcom_schema_string.load(xcom_dump) - assert result == { - "key": "test_key", - "timestamp": self.default_time_parsed, - "logical_date": self.default_time_parsed, - "task_id": "test_task_id", - "dag_id": "test_dag", - "value": "test_binary", - } diff --git a/tests/api_fastapi/core_api/routes/public/test_xcom.py b/tests/api_fastapi/core_api/routes/public/test_xcom.py index b35cfe8ab331f..e0010c79fef1a 100644 --- a/tests/api_fastapi/core_api/routes/public/test_xcom.py +++ b/tests/api_fastapi/core_api/routes/public/test_xcom.py @@ -36,8 +36,6 @@ TEST_XCOM_KEY = "test_xcom_key" TEST_XCOM_VALUE = {"key": "value"} -TEST_XCOM_KEY2 = "test_xcom_key_non_serializable" -TEST_XCOM_VALUE2 = {"key": {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359}} TEST_XCOM_KEY3 = "test_xcom_key_non_existing" TEST_DAG_ID = "test-dag-id" @@ -140,25 +138,6 @@ def test_should_respond_200_native(self, test_client): "value": TEST_XCOM_VALUE, } - @conf_vars({("core", "enable_xcom_pickling"): "True"}) - def test_should_respond_200_pickled(self, test_client): - self.create_xcom(TEST_XCOM_KEY2, TEST_XCOM_VALUE2) - response = test_client.get( - f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY2}" - ) - assert response.status_code == 200 - - current_data = response.json() - assert current_data == { - "dag_id": TEST_DAG_ID, - "logical_date": logical_date_parsed.strftime("%Y-%m-%dT%H:%M:%SZ"), - "key": TEST_XCOM_KEY2, - "task_id": TEST_TASK_ID, - "map_index": -1, - "timestamp": current_data["timestamp"], - "value": str(TEST_XCOM_VALUE2), - } - def test_should_raise_404_for_non_existent_xcom(self, test_client): response = test_client.get( f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY3}" diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py index d51cf956f09b3..5c515f6562c76 100644 --- a/tests/models/test_xcom.py +++ b/tests/models/test_xcom.py @@ -18,7 +18,6 @@ import datetime import operator -import os from typing import TYPE_CHECKING from unittest import mock from unittest.mock import MagicMock @@ -28,7 +27,6 @@ from airflow.configuration import conf from airflow.models.dagrun import DagRun, DagRunType from airflow.models.taskinstance import TaskInstance -from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend from airflow.operators.empty import EmptyOperator from airflow.settings import json @@ -109,13 +107,12 @@ def test_resolve_xcom_class(self): cls = resolve_xcom_backend() assert issubclass(cls, CustomXCom) - @conf_vars({("core", "xcom_backend"): "", ("core", "enable_xcom_pickling"): "False"}) + @conf_vars({("core", "xcom_backend"): ""}) def test_resolve_xcom_class_fallback_to_basexcom(self): cls = resolve_xcom_backend() assert issubclass(cls, BaseXCom) assert cls.serialize_value([1]) == b"[1]" - @conf_vars({("core", "enable_xcom_pickling"): "False"}) @conf_vars({("core", "xcom_backend"): "to be removed"}) def test_resolve_xcom_class_fallback_to_basexcom_no_config(self): conf.remove_option("core", "xcom_backend") @@ -123,63 +120,6 @@ def test_resolve_xcom_class_fallback_to_basexcom_no_config(self): assert issubclass(cls, BaseXCom) assert cls.serialize_value([1]) == b"[1]" - def test_xcom_deserialize_with_json_to_pickle_switch(self, task_instance, session): - ti_key = TaskInstanceKey( - dag_id=task_instance.dag_id, - task_id=task_instance.task_id, - run_id=task_instance.run_id, - ) - with conf_vars({("core", "enable_xcom_pickling"): "False"}): - XCom.set( - key="xcom_test3", - value={"key": "value"}, - dag_id=task_instance.dag_id, - task_id=task_instance.task_id, - run_id=task_instance.run_id, - session=session, - ) - with conf_vars({("core", "enable_xcom_pickling"): "True"}): - ret_value = XCom.get_value(key="xcom_test3", ti_key=ti_key, session=session) - assert ret_value == {"key": "value"} - - @pytest.mark.skip_if_database_isolation_mode - def test_xcom_deserialize_pickle_when_xcom_pickling_is_disabled(self, task_instance, session): - with conf_vars({("core", "enable_xcom_pickling"): "True"}): - XCom.set( - key="xcom_test3", - value={"key": "value"}, - dag_id=task_instance.dag_id, - task_id=task_instance.task_id, - run_id=task_instance.run_id, - session=session, - ) - with conf_vars({("core", "enable_xcom_pickling"): "False"}): - with pytest.raises(UnicodeDecodeError): - XCom.get_one( - key="xcom_test3", - dag_id=task_instance.dag_id, - task_id=task_instance.task_id, - run_id=task_instance.run_id, - session=session, - ) - - @pytest.mark.skip_if_database_isolation_mode - @conf_vars({("core", "xcom_enable_pickling"): "False"}) - def test_xcom_disable_pickle_type_fail_on_non_json(self, task_instance, session): - class PickleRce: - def __reduce__(self): - return os.system, ("ls -alt",) - - with pytest.raises(TypeError): - XCom.set( - key="xcom_test3", - value=PickleRce(), - dag_id=task_instance.dag_id, - task_id=task_instance.task_id, - run_id=task_instance.run_id, - session=session, - ) - @mock.patch("airflow.models.xcom.XCom.orm_deserialize_value") def test_xcom_init_on_load_uses_orm_deserialize_value(self, mock_orm_deserialize): instance = BaseXCom( @@ -216,7 +156,6 @@ def test_get_one_custom_backend_no_use_orm_deserialize_value(self, task_instance XCom.orm_deserialize_value.assert_not_called() @pytest.mark.skip_if_database_isolation_mode - @conf_vars({("core", "enable_xcom_pickling"): "False"}) @mock.patch("airflow.models.xcom.conf.getimport") def test_set_serialize_call_current_signature(self, get_import, task_instance): """ @@ -266,17 +205,6 @@ def serialize_value( ) -@pytest.fixture( - params=[ - pytest.param("true", id="enable_xcom_pickling=true"), - pytest.param("false", id="enable_xcom_pickling=false"), - ], -) -def setup_xcom_pickling(request): - with conf_vars({("core", "enable_xcom_pickling"): str(request.param)}): - yield - - @pytest.fixture def push_simple_json_xcom(session): def func(*, ti: TaskInstance, key: str, value): @@ -292,7 +220,6 @@ def func(*, ti: TaskInstance, key: str, value): return func -@pytest.mark.usefixtures("setup_xcom_pickling") class TestXComGet: @pytest.fixture def setup_for_xcom_get_one(self, task_instance, push_simple_json_xcom): @@ -403,7 +330,6 @@ def test_xcom_get_many_from_prior_dates(self, session, tis_for_xcom_get_many_fro assert [x.logical_date for x in stored_xcoms] == [ti2.logical_date, ti1.logical_date] -@pytest.mark.usefixtures("setup_xcom_pickling") class TestXComSet: def test_xcom_set(self, session, task_instance): XCom.set( @@ -439,7 +365,6 @@ def test_xcom_set_again_replace(self, session, task_instance): assert session.query(XCom).one().value == {"key2": "value2"} -@pytest.mark.usefixtures("setup_xcom_pickling") class TestXComClear: @pytest.fixture def setup_for_xcom_clear(self, task_instance, push_simple_json_xcom):