From 46bc25577fd5d6737417b71ee706fbfc22b8ceb6 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 4 May 2024 18:36:26 +0200 Subject: [PATCH 1/7] Implement a pending status for tasks. --- docs/source/changes.md | 7 ++++--- docs/source/coiled.md | 2 +- src/pytask_parallel/execute.py | 37 +++++++++++++++++++++------------- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/docs/source/changes.md b/docs/source/changes.md index bcaa1a6..0c2da68 100644 --- a/docs/source/changes.md +++ b/docs/source/changes.md @@ -19,12 +19,13 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and or processes automatically. - {pull}`96` handles local paths with remote executors. `PathNode`s are not supported as dependencies or products (except for return annotations). -- {pull}`99` changes that all tasks that are ready are being scheduled. It improves - interactions with adaptive scaling. {issue}`98` does handle the resulting issues: no - strong adherence to priorities, no pending status. +- {pull}`99` changes that all ready tasks are being scheduled. It improves interactions + with adaptive scaling. {issue}`98` does handle the resulting issues: no strong + adherence to priorities, no pending status. - {pull}`100` adds project management with rye. - {pull}`101` adds syncing for local paths as dependencies or products in remote environments with the same OS. +- {pull}`102` implements a pending status for scheduled but not started tasks. ## 0.4.1 - 2024-01-12 diff --git a/docs/source/coiled.md b/docs/source/coiled.md index 5463f7b..f53912e 100644 --- a/docs/source/coiled.md +++ b/docs/source/coiled.md @@ -1,7 +1,7 @@ # coiled ```{caution} -Currently, the coiled backend can only be used if your workflow code is organized in a +Currently, the coiled backend can only be used if your workflow code is organized as a package due to how pytask imports your code and dask serializes task functions ([issue](https://github.com/dask/distributed/issues/8607)). ``` diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 1ee0d71..afb9572 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -16,6 +16,7 @@ from pytask import PTask from pytask import PythonNode from pytask import Session +from pytask import TaskExecutionStatus from pytask import console from pytask import get_marks from pytask import hookimpl @@ -53,6 +54,9 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 reports = session.execution_reports running_tasks: dict[str, Future[Any]] = {} + # Get the live execution manager from the registry if it exists. + live_execution = session.config["pm"].get_plugin("live_execution") + # The executor can only be created after the collection to give users the # possibility to inject their own executors. session.config["_parallel_executor"] = registry.get_parallel_backend( @@ -68,17 +72,17 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports = [] ready_tasks = list(session.scheduler.get_ready(10_000)) - for task_name in ready_tasks: - task = session.dag.nodes[task_name]["task"] + for task_signature in ready_tasks: + task = session.dag.nodes[task_signature]["task"] session.hook.pytask_execute_task_log_start( - session=session, task=task + session=session, task=task, status=TaskExecutionStatus.PENDING ) try: session.hook.pytask_execute_task_setup( session=session, task=task ) - running_tasks[task_name] = session.hook.pytask_execute_task( - session=session, task=task + running_tasks[task_signature] = ( + session.hook.pytask_execute_task(session=session, task=task) ) sleeper.reset() except Exception: # noqa: BLE001 @@ -86,13 +90,13 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 task, sys.exc_info() ) newly_collected_reports.append(report) - session.scheduler.done(task_name) + session.scheduler.done(task_signature) if not ready_tasks: sleeper.increment() - for task_name in list(running_tasks): - future = running_tasks[task_name] + for task_signature in list(running_tasks): + future = running_tasks[task_signature] if future.done(): wrapper_result = parse_future_result(future) @@ -108,17 +112,17 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ) if wrapper_result.exc_info is not None: - task = session.dag.nodes[task_name]["task"] + task = session.dag.nodes[task_signature]["task"] newly_collected_reports.append( ExecutionReport.from_task_and_exception( task, wrapper_result.exc_info, # type: ignore[arg-type] ) ) - running_tasks.pop(task_name) - session.scheduler.done(task_name) + running_tasks.pop(task_signature) + session.scheduler.done(task_signature) else: - task = session.dag.nodes[task_name]["task"] + task = session.dag.nodes[task_signature]["task"] _update_carry_over_products( task, wrapper_result.carry_over_products ) @@ -134,9 +138,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 else: report = ExecutionReport.from_task(task) - running_tasks.pop(task_name) + running_tasks.pop(task_signature) newly_collected_reports.append(report) - session.scheduler.done(task_name) + session.scheduler.done(task_signature) + + elif live_execution and future.running(): + live_execution.update_task( + task_signature, status=TaskExecutionStatus.RUNNING + ) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( From 7131a1f8c44f0449e46fe9d7048b2089c3590aa4 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:00:42 +0200 Subject: [PATCH 2/7] Leftover commit. --- src/pytask_parallel/execute.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index afb9572..292aae2 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -62,7 +62,6 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["_parallel_executor"] = registry.get_parallel_backend( session.config["parallel_backend"], n_workers=session.config["n_workers"] ) - with session.config["_parallel_executor"]: sleeper = _Sleeper() @@ -142,10 +141,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports.append(report) session.scheduler.done(task_signature) - elif live_execution and future.running(): - live_execution.update_task( - task_signature, status=TaskExecutionStatus.RUNNING + elif not future.done(): + pass + elif live_execution: + status = _get_status_from_undone_task( + task_signature, future, session.config["_parallel_executor"] ) + if status == TaskExecutionStatus.RUNNING: + live_execution.update_task(task_signature, status=status) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( @@ -299,3 +302,15 @@ def increment(self) -> None: def sleep(self) -> None: time.sleep(self.timings[self.timing_idx]) + + +def _get_status_from_undone_task( + task_signature: str, future: Future, executor: Any +) -> TaskExecutionStatus: + """Get the status of a task that is undone.""" + if hasattr(future, "_state"): + status = future._state + if status == "RUNNING": + breakpoint() + return TaskExecutionStatus.RUNNING + return TaskExecutionStatus.PENDING From b73ed65382710c719575296571be2d345ca4fa47 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:32:45 +0200 Subject: [PATCH 3/7] Add correct pending status for cf and loky. --- src/pytask_parallel/execute.py | 44 ++++++++++++++++----------------- src/pytask_parallel/wrappers.py | 23 ++++++++++++++++- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 292aae2..e471c5c 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -2,6 +2,7 @@ from __future__ import annotations +import multiprocessing import sys import time from typing import TYPE_CHECKING @@ -24,6 +25,7 @@ from pytask.tree_util import tree_map from pytask.tree_util import tree_structure +from pytask_parallel.backends import ParallelBackend from pytask_parallel.backends import WorkerType from pytask_parallel.backends import registry from pytask_parallel.typing import CarryOverPath @@ -53,6 +55,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 __tracebackhide__ = True reports = session.execution_reports running_tasks: dict[str, Future[Any]] = {} + sleeper = _Sleeper() # Get the live execution manager from the registry if it exists. live_execution = session.config["pm"].get_plugin("live_execution") @@ -63,7 +66,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["parallel_backend"], n_workers=session.config["n_workers"] ) with session.config["_parallel_executor"]: - sleeper = _Sleeper() + # Create a shared memory object to differentiate between running and pending + # tasks. + if session.config["parallel_backend"] in ( + ParallelBackend.PROCESSES, + ParallelBackend.THREADS, + ParallelBackend.LOKY, + ): + session.config["_shared_memory"] = multiprocessing.Manager().dict() i = 0 while session.scheduler.is_active(): @@ -141,14 +151,11 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports.append(report) session.scheduler.done(task_signature) - elif not future.done(): - pass - elif live_execution: - status = _get_status_from_undone_task( - task_signature, future, session.config["_parallel_executor"] - ) - if status == TaskExecutionStatus.RUNNING: - live_execution.update_task(task_signature, status=status) + elif live_execution and "_shared_memory" in session.config: + if task_signature in session.config["_shared_memory"]: + live_execution.update_task( + task_signature, status=TaskExecutionStatus.RUNNING + ) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( @@ -228,6 +235,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: kwargs=kwargs, remote=remote, session_filterwarnings=session.config["filterwarnings"], + shared_memory=session.config["_shared_memory"], show_locals=session.config["show_locals"], task_filterwarnings=get_marks(task, "filterwarnings"), ) @@ -236,7 +244,11 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: from pytask_parallel.wrappers import wrap_task_in_thread return session.config["_parallel_executor"].submit( - wrap_task_in_thread, task=task, remote=False, **kwargs + wrap_task_in_thread, + task=task, + remote=False, + shared_memory=session.config["_shared_memory"], + **kwargs, ) msg = f"Unknown worker type {worker_type}" raise ValueError(msg) @@ -302,15 +314,3 @@ def increment(self) -> None: def sleep(self) -> None: time.sleep(self.timings[self.timing_idx]) - - -def _get_status_from_undone_task( - task_signature: str, future: Future, executor: Any -) -> TaskExecutionStatus: - """Get the status of a task that is undone.""" - if hasattr(future, "_state"): - status = future._state - if status == "RUNNING": - breakpoint() - return TaskExecutionStatus.RUNNING - return TaskExecutionStatus.PENDING diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 6157873..ad8c2ed 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -55,7 +55,9 @@ class WrapperResult: stderr: str -def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperResult: +def wrap_task_in_thread( + task: PTask, *, remote: bool, shared_memory: dict[str, bool] | None, **kwargs: Any +) -> WrapperResult: """Mock execution function such that it returns the same as for processes. The function for processes returns ``warning_reports`` and an ``exception``. With @@ -64,6 +66,11 @@ def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperR """ __tracebackhide__ = True + + # Add task to shared memory to indicate that it is currently being executed. + if shared_memory is not None: + shared_memory[task.signature] = True + try: out = task.function(**kwargs) except Exception: # noqa: BLE001 @@ -71,6 +78,11 @@ def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperR else: _handle_function_products(task, out, remote=remote) exc_info = None # type: ignore[assignment] + + # Remove task from shared memory to indicate that it is no longer being executed. + if shared_memory is not None: + shared_memory.pop(task.signature) + return WrapperResult( carry_over_products=None, # type: ignore[arg-type] warning_reports=[], @@ -87,6 +99,7 @@ def wrap_task_in_process( # noqa: PLR0913 kwargs: dict[str, Any], remote: bool, session_filterwarnings: tuple[str, ...], + shared_memory: dict[str, bool] | None, show_locals: bool, task_filterwarnings: tuple[Mark, ...], ) -> WrapperResult: @@ -99,6 +112,10 @@ def wrap_task_in_process( # noqa: PLR0913 # Hide this function from tracebacks. __tracebackhide__ = True + # Add task to shared memory to indicate that it is currently being executed. + if shared_memory is not None: + shared_memory[task.signature] = True + # Patch set_trace and breakpoint to show a better error message. _patch_set_trace_and_breakpoint() @@ -156,6 +173,10 @@ def wrap_task_in_process( # noqa: PLR0913 captured_stdout_buffer.close() captured_stderr_buffer.close() + # Remove task from shared memory to indicate that it is no longer being executed. + if shared_memory is not None: + shared_memory.pop(task.signature) + return WrapperResult( carry_over_products=products, # type: ignore[arg-type] warning_reports=warning_reports, From 90e05a2d5b1eb67d21f1163f70e357c512f720fe Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:38:05 +0200 Subject: [PATCH 4/7] Fix tests. --- src/pytask_parallel/execute.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index e471c5c..363b48c 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -235,7 +235,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: kwargs=kwargs, remote=remote, session_filterwarnings=session.config["filterwarnings"], - shared_memory=session.config["_shared_memory"], + shared_memory=session.config.get("_shared_memory"), show_locals=session.config["show_locals"], task_filterwarnings=get_marks(task, "filterwarnings"), ) @@ -247,7 +247,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: wrap_task_in_thread, task=task, remote=False, - shared_memory=session.config["_shared_memory"], + shared_memory=session.config.get("_shared_memory"), **kwargs, ) msg = f"Unknown worker type {worker_type}" From 1e2006c04c194934cded892c86ea17817544c734 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:47:43 +0200 Subject: [PATCH 5/7] FIx. --- src/pytask_parallel/execute.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 363b48c..c789b16 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -57,6 +57,18 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 running_tasks: dict[str, Future[Any]] = {} sleeper = _Sleeper() + # Create a shared memory object to differentiate between running and pending + # tasks for some parallel backends. + if session.config["parallel_backend"] in ( + ParallelBackend.PROCESSES, + ParallelBackend.THREADS, + ParallelBackend.LOKY, + ): + session.config["_shared_memory"] = multiprocessing.Manager().dict() + start_execution_state = TaskExecutionStatus.PENDING + else: + start_execution_state = TaskExecutionStatus.RUNNING + # Get the live execution manager from the registry if it exists. live_execution = session.config["pm"].get_plugin("live_execution") @@ -66,15 +78,6 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["parallel_backend"], n_workers=session.config["n_workers"] ) with session.config["_parallel_executor"]: - # Create a shared memory object to differentiate between running and pending - # tasks. - if session.config["parallel_backend"] in ( - ParallelBackend.PROCESSES, - ParallelBackend.THREADS, - ParallelBackend.LOKY, - ): - session.config["_shared_memory"] = multiprocessing.Manager().dict() - i = 0 while session.scheduler.is_active(): try: @@ -84,7 +87,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 for task_signature in ready_tasks: task = session.dag.nodes[task_signature]["task"] session.hook.pytask_execute_task_log_start( - session=session, task=task, status=TaskExecutionStatus.PENDING + session=session, task=task, status=start_execution_state ) try: session.hook.pytask_execute_task_setup( From 89ff770f05d1889a2a12688a9b1ff6712bfd2065 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 23:02:50 +0200 Subject: [PATCH 6/7] fix. --- src/pytask_parallel/execute.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index c789b16..79ba551 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -5,6 +5,7 @@ import multiprocessing import sys import time +from contextlib import ExitStack from typing import TYPE_CHECKING from typing import Any @@ -64,9 +65,10 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ParallelBackend.THREADS, ParallelBackend.LOKY, ): - session.config["_shared_memory"] = multiprocessing.Manager().dict() + manager_cls = multiprocessing.Manager start_execution_state = TaskExecutionStatus.PENDING else: + manager_cls = ExitStack start_execution_state = TaskExecutionStatus.RUNNING # Get the live execution manager from the registry if it exists. @@ -77,7 +79,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["_parallel_executor"] = registry.get_parallel_backend( session.config["parallel_backend"], n_workers=session.config["n_workers"] ) - with session.config["_parallel_executor"]: + with session.config["_parallel_executor"], manager_cls() as manager: + if session.config["parallel_backend"] in ( + ParallelBackend.PROCESSES, + ParallelBackend.THREADS, + ParallelBackend.LOKY, + ): + session.config["_shared_memory"] = manager.dict() + i = 0 while session.scheduler.is_active(): try: From ba20e229aa49062a4d59f9e383427ca490dc683f Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 23:24:53 +0200 Subject: [PATCH 7/7] Fix types. --- src/pytask_parallel/execute.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 79ba551..135d31e 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -2,12 +2,13 @@ from __future__ import annotations -import multiprocessing import sys import time from contextlib import ExitStack +from multiprocessing import Manager from typing import TYPE_CHECKING from typing import Any +from typing import Callable import cloudpickle from _pytask.node_protocols import PPathNode @@ -37,6 +38,7 @@ if TYPE_CHECKING: from concurrent.futures import Future + from multiprocessing.managers import SyncManager from pytask_parallel.wrappers import WrapperResult @@ -65,7 +67,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ParallelBackend.THREADS, ParallelBackend.LOKY, ): - manager_cls = multiprocessing.Manager + manager_cls: Callable[[], SyncManager] | type[ExitStack] = Manager start_execution_state = TaskExecutionStatus.PENDING else: manager_cls = ExitStack @@ -85,7 +87,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ParallelBackend.THREADS, ParallelBackend.LOKY, ): - session.config["_shared_memory"] = manager.dict() + session.config["_shared_memory"] = manager.dict() # type: ignore[union-attr] i = 0 while session.scheduler.is_active():