Skip to content

Commit 4f6c127

Browse files
committed
Fix KubernetesExecutor test
- Previous test failure is cuased by cache state of executor_instances - Should set ti.state = RUNNING after ti.run
1 parent c8d6ea2 commit 4f6c127

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
from tests_common.test_utils.compat import PythonOperator
4545
from tests_common.test_utils.config import conf_vars
46+
from tests_common.test_utils.executor_loader import clean_executor_loader
4647
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
4748

4849
if AIRFLOW_V_3_0_PLUS:
@@ -76,6 +77,7 @@ def teardown_method(self):
7677
@pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS])
7778
def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instance, state):
7879
"""Test for k8s executor, the log is read from get_task_log method"""
80+
clean_executor_loader()
7981
mock_k8s_get_task_log.return_value = ([], [])
8082
executor_name = "KubernetesExecutor"
8183
ti = create_task_instance(
@@ -86,6 +88,7 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc
8688
)
8789
ti.state = state
8890
ti.triggerer_job = None
91+
ti.executor = executor_name
8992
with conf_vars({("core", "executor"): executor_name}):
9093
reload(executor_loader)
9194
fth = FileTaskHandler("")
@@ -105,11 +108,12 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc
105108
pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="pod-name-xxx")), "default"),
106109
],
107110
)
108-
@patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="KubernetesExecutor")
111+
@conf_vars({("core", "executor"): "KubernetesExecutor"})
109112
@patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
110113
def test_read_from_k8s_under_multi_namespace_mode(
111114
self, mock_kube_client, pod_override, namespace_to_call
112115
):
116+
reload(executor_loader)
113117
mock_read_log = mock_kube_client.return_value.read_namespaced_pod_log
114118
mock_list_pod = mock_kube_client.return_value.list_namespaced_pod
115119

@@ -139,6 +143,7 @@ def task_callable(ti):
139143
)
140144
ti = TaskInstance(task=task, run_id=dagrun.run_id)
141145
ti.try_number = 3
146+
ti.executor = "KubernetesExecutor"
142147

143148
logger = ti.log
144149
ti.log.disabled = False
@@ -147,6 +152,8 @@ def task_callable(ti):
147152
set_context(logger, ti)
148153
ti.run(ignore_ti_state=True)
149154
ti.state = TaskInstanceState.RUNNING
155+
# clear executor_instances cache
156+
file_handler.executor_instances = {}
150157
file_handler.read(ti, 2)
151158

152159
# first we find pod name

tests/utils/test_log_handlers.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ def test_file_task_handler_with_multiple_executors(
238238
)
239239
if executor_name is not None:
240240
ti.executor = executor_name
241-
ti.state = TaskInstanceState.RUNNING
242241
ti.try_number = 1
243242
logger = ti.log
244243
ti.log.disabled = False
@@ -249,13 +248,16 @@ def test_file_task_handler_with_multiple_executors(
249248
assert file_handler is not None
250249

251250
set_context(logger, ti)
251+
# clear executor_instances cache
252+
file_handler.executor_instances = {}
252253
assert file_handler.handler is not None
253254
# We expect set_context generates a file locally.
254255
log_filename = file_handler.handler.baseFilename
255256
assert os.path.isfile(log_filename)
256257
assert log_filename.endswith("1.log"), log_filename
257258

258259
ti.run(ignore_ti_state=True)
260+
ti.state = TaskInstanceState.RUNNING
259261

260262
file_handler.flush()
261263
file_handler.close()

0 commit comments

Comments
 (0)