From db5337481aeff1b1eb5226ccfb2b2a4f53817cd4 Mon Sep 17 00:00:00 2001 From: pwwang Date: Wed, 31 Jul 2024 20:54:10 -0700 Subject: [PATCH] feat: expose on_jobcmd_* hooks for plugins to modify the job wrapper script --- docs/plugin.md | 49 ++++++++++++++++++++++ pipen/pluginmgr.py | 97 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_plugin.py | 34 ++++++++++++++++ 3 files changed, 180 insertions(+) diff --git a/docs/plugin.md b/docs/plugin.md index 45f19198..70e5303e 100644 --- a/docs/plugin.md +++ b/docs/plugin.md @@ -125,6 +125,55 @@ See [`simplug`][1] for more details. When a job is done but failed (i.e. return_code == 1). +- `on_jobcmd_init(job) -> str` (sync) + + When the job command wrapper script is initialized before the prescript is run + + This should return a piece of bash code to be inserted in the wrapped job + script (template), which is a python template string, with the following + variables available: `status` and `job`. `status` is the class `JobStatus` from + `xqute.defaults.py` and `job` is the `Job` instance. + + For multiple plugins, the code will be inserted in the order of the plugin priority. + + The code will replace the `#![jobcmd_init]` placeholder in the wrapped job script. + See also + +- `on_jobcmd_prep(job) -> str` (sync) + + When the job command right about to be run + + This should return a piece of bash code to be inserted in the wrapped job + script (template), which is a python template string, with the following + variables available: `status` and `job`. `status` is the class `JobStatus` from + `xqute.defaults.py` and `job` is the `Job` instance. + + The bash variable `$cmd` is accessible in the context. It is also possible to + modify the `cmd` variable. Just remember to assign the modified value to `cmd`. + + For multiple plugins, the code will be inserted in the order of the plugin priority. + Keep in mind that the `$cmd` may be modified by other plugins. + + The code will replace the `#![jobcmd_prep]` placeholder in the wrapped job script. + See also + +- `on_jobcmd_end(job) -> str` (sync): + + When the job command finishes and after the postscript is run + + This should return a piece of bash code to be inserted in the wrapped job + script (template), which is a python template string, with the following + variables available: `status` and `job`. `status` is the class `JobStatus` from + `xqute.defaults.py` and `job` is the `Job` instance. + + The bash variable `$rc` is accessible in the context, which is the return code + of the job command. + + For multiple plugins, the code will be inserted in the order of the plugin priority. + + The code will replace the `#![jobcmd_end]` placeholder in the wrapped job script. + See also + #### IO hooks The io hooks are used to handle the input/output files/directories. The idea is to provide more flexibility to fetch the last modified time of the files/directories, and remove the files/directories when the job restarts. The APIs of these types of plugins are primarily used to generate the cache signature for the jobs. There are 5 APIs that need to be implemented: diff --git a/pipen/pluginmgr.py b/pipen/pluginmgr.py index 5f923fa2..3da70303 100644 --- a/pipen/pluginmgr.py +++ b/pipen/pluginmgr.py @@ -419,6 +419,79 @@ async def output_exists(job: Job, path: str, is_dir: bool) -> bool: """ +@plugin.spec(result=SimplugResult.ALL_AVAILS) +def on_jobcmd_init(job: Job) -> str: + """When the job command wrapper script is initialized before the prescript is run + + This should return a piece of bash code to be inserted in the wrapped job + script (template), which is a python template string, with the following + variables available: `status` and `job`. `status` is the class `JobStatus` from + `xqute.defaults.py` and `job` is the `Job` instance. + + For multiple plugins, the code will be inserted in the order of the plugin priority. + + The code will replace the `#![jobcmd_init]` placeholder in the wrapped job script. + See also + + Args: + job: The job object + + Returns: + The bash code to be inserted + """ + + +@plugin.spec(result=SimplugResult.ALL_AVAILS) +def on_jobcmd_prep(job: Job) -> str: + """When the job command right about to be run + + This should return a piece of bash code to be inserted in the wrapped job + script (template), which is a python template string, with the following + variables available: `status` and `job`. `status` is the class `JobStatus` from + `xqute.defaults.py` and `job` is the `Job` instance. + + The bash variable `$cmd` is accessible in the context. It is also possible to + modify the `cmd` variable. Just remember to assign the modified value to `cmd`. + + For multiple plugins, the code will be inserted in the order of the plugin priority. + Keep in mind that the `$cmd` may be modified by other plugins. + + The code will replace the `#![jobcmd_prep]` placeholder in the wrapped job script. + See also + + Args: + job: The job object + + Returns: + The bash code to be inserted + """ + + +@plugin.spec(result=SimplugResult.ALL_AVAILS) +def on_jobcmd_end(job: Job) -> str: + """When the job command finishes and after the postscript is run + + This should return a piece of bash code to be inserted in the wrapped job + script (template), which is a python template string, with the following + variables available: `status` and `job`. `status` is the class `JobStatus` from + `xqute.defaults.py` and `job` is the `Job` instance. + + The bash variable `$rc` is accessible in the context, which is the return code + of the job command. + + For multiple plugins, the code will be inserted in the order of the plugin priority. + + The code will replace the `#![jobcmd_end]` placeholder in the wrapped job script. + See also + + Args: + job: The job object + + Returns: + The bash code to be inserted + """ + + class PipenMainPlugin: """The builtin core plugin, used to update the progress bar and cache the job""" @@ -653,5 +726,29 @@ async def on_job_failed(self, scheduler: Scheduler, job: Job): """When a job is failed""" await plugin.hooks.on_job_failed(job) + @xqute_plugin.impl + def on_jobcmd_init(self, scheduler: Scheduler, job: Job): + """When the job command wrapper script is initialized""" + codes = plugin.hooks.on_jobcmd_init(job) + if not codes: + return None + return "\n\n".join(codes) + + @xqute_plugin.impl + def on_jobcmd_prep(self, scheduler: Scheduler, job: Job): + """When the job command is about to be run""" + codes = plugin.hooks.on_jobcmd_prep(job) + if not codes: + return None + return "\n\n".join(codes) + + @xqute_plugin.impl + def on_jobcmd_end(self, scheduler: Scheduler, job: Job): + """When the job command finishes""" + codes = plugin.hooks.on_jobcmd_end(job) + if not codes: + return None + return "\n\n".join(codes) + xqute_plugin.register(XqutePipenPlugin) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index f42730a8..8695b691 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -223,3 +223,37 @@ class IOProc4(IOProc3): # # assert pipen.run() assert test_outdir.joinpath("out.txt").exists() assert test_outdir.joinpath("out.txt").read_text() == "abcd" + + +@pytest.mark.forked +def test_jobcmd_hooks(pipen): + + @plugin.register + class MyJobCmdPlugin: + @plugin.impl + def on_jobcmd_init(job): + return "# on_jobcmd_init from myjobcmdplugin" + + @plugin.impl + def on_jobcmd_prep(job): + return "# on_jobcmd_prep from myjobcmdplugin" + + @plugin.impl + def on_jobcmd_end(job): + return "# on_jobcmd_end from myjobcmdplugin" + + class MyProc(Proc): + input = "in:var" + input_data = [1] + output = "out:var:{{in.in}}" + script = "echo {{proc.name}}" + + pipen.set_starts(MyProc).run() + assert pipen.run() + + wrapper_script = pipen.workdir / "MyProc" / "0" / "job.wrapped.local" + assert wrapper_script.exists() + content = wrapper_script.read_text() + assert "# on_jobcmd_init from myjobcmdplugin" in content + assert "# on_jobcmd_prep from myjobcmdplugin" in content + assert "# on_jobcmd_end from myjobcmdplugin" in content