Skip to content

Commit

Permalink
feat: expose on_jobcmd_* hooks for plugins to modify the job wrapper …
Browse files Browse the repository at this point in the history
…script
  • Loading branch information
pwwang committed Aug 1, 2024
1 parent 86af354 commit db53374
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 0 deletions.
49 changes: 49 additions & 0 deletions docs/plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,55 @@ See [`simplug`][1] for more details.

When a job is done but failed (i.e. return_code == 1).

- `on_jobcmd_init(job) -> str` (sync)

When the job command wrapper script is initialized before the prescript is run

This should return a piece of bash code to be inserted in the wrapped job
script (template), which is a python template string, with the following
variables available: `status` and `job`. `status` is the class `JobStatus` from
`xqute.defaults.py` and `job` is the `Job` instance.

For multiple plugins, the code will be inserted in the order of the plugin priority.

The code will replace the `#![jobcmd_init]` placeholder in the wrapped job script.
See also <https://github.com/pwwang/xqute/blob/master/xqute/defaults.py#L95>

- `on_jobcmd_prep(job) -> str` (sync)

When the job command right about to be run

This should return a piece of bash code to be inserted in the wrapped job
script (template), which is a python template string, with the following
variables available: `status` and `job`. `status` is the class `JobStatus` from
`xqute.defaults.py` and `job` is the `Job` instance.

The bash variable `$cmd` is accessible in the context. It is also possible to
modify the `cmd` variable. Just remember to assign the modified value to `cmd`.

For multiple plugins, the code will be inserted in the order of the plugin priority.
Keep in mind that the `$cmd` may be modified by other plugins.

The code will replace the `#![jobcmd_prep]` placeholder in the wrapped job script.
See also <https://github.com/pwwang/xqute/blob/master/xqute/defaults.py#L95>

- `on_jobcmd_end(job) -> str` (sync):

When the job command finishes and after the postscript is run

This should return a piece of bash code to be inserted in the wrapped job
script (template), which is a python template string, with the following
variables available: `status` and `job`. `status` is the class `JobStatus` from
`xqute.defaults.py` and `job` is the `Job` instance.

The bash variable `$rc` is accessible in the context, which is the return code
of the job command.

For multiple plugins, the code will be inserted in the order of the plugin priority.

The code will replace the `#![jobcmd_end]` placeholder in the wrapped job script.
See also <https://github.com/pwwang/xqute/blob/master/xqute/defaults.py#L95>

#### IO hooks

The io hooks are used to handle the input/output files/directories. The idea is to provide more flexibility to fetch the last modified time of the files/directories, and remove the files/directories when the job restarts. The APIs of these types of plugins are primarily used to generate the cache signature for the jobs. There are 5 APIs that need to be implemented:
Expand Down
97 changes: 97 additions & 0 deletions pipen/pluginmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,79 @@ async def output_exists(job: Job, path: str, is_dir: bool) -> bool:
"""


@plugin.spec(result=SimplugResult.ALL_AVAILS)
def on_jobcmd_init(job: Job) -> str:
"""When the job command wrapper script is initialized before the prescript is run
This should return a piece of bash code to be inserted in the wrapped job
script (template), which is a python template string, with the following
variables available: `status` and `job`. `status` is the class `JobStatus` from
`xqute.defaults.py` and `job` is the `Job` instance.
For multiple plugins, the code will be inserted in the order of the plugin priority.
The code will replace the `#![jobcmd_init]` placeholder in the wrapped job script.
See also <https://github.com/pwwang/xqute/blob/master/xqute/defaults.py#L95>
Args:
job: The job object
Returns:
The bash code to be inserted
"""


@plugin.spec(result=SimplugResult.ALL_AVAILS)
def on_jobcmd_prep(job: Job) -> str:
"""When the job command right about to be run
This should return a piece of bash code to be inserted in the wrapped job
script (template), which is a python template string, with the following
variables available: `status` and `job`. `status` is the class `JobStatus` from
`xqute.defaults.py` and `job` is the `Job` instance.
The bash variable `$cmd` is accessible in the context. It is also possible to
modify the `cmd` variable. Just remember to assign the modified value to `cmd`.
For multiple plugins, the code will be inserted in the order of the plugin priority.
Keep in mind that the `$cmd` may be modified by other plugins.
The code will replace the `#![jobcmd_prep]` placeholder in the wrapped job script.
See also <https://github.com/pwwang/xqute/blob/master/xqute/defaults.py#L95>
Args:
job: The job object
Returns:
The bash code to be inserted
"""


@plugin.spec(result=SimplugResult.ALL_AVAILS)
def on_jobcmd_end(job: Job) -> str:
"""When the job command finishes and after the postscript is run
This should return a piece of bash code to be inserted in the wrapped job
script (template), which is a python template string, with the following
variables available: `status` and `job`. `status` is the class `JobStatus` from
`xqute.defaults.py` and `job` is the `Job` instance.
The bash variable `$rc` is accessible in the context, which is the return code
of the job command.
For multiple plugins, the code will be inserted in the order of the plugin priority.
The code will replace the `#![jobcmd_end]` placeholder in the wrapped job script.
See also <https://github.com/pwwang/xqute/blob/master/xqute/defaults.py#L95>
Args:
job: The job object
Returns:
The bash code to be inserted
"""


class PipenMainPlugin:
"""The builtin core plugin, used to update the progress bar and
cache the job"""
Expand Down Expand Up @@ -653,5 +726,29 @@ async def on_job_failed(self, scheduler: Scheduler, job: Job):
"""When a job is failed"""
await plugin.hooks.on_job_failed(job)

@xqute_plugin.impl
def on_jobcmd_init(self, scheduler: Scheduler, job: Job):
"""When the job command wrapper script is initialized"""
codes = plugin.hooks.on_jobcmd_init(job)
if not codes:
return None
return "\n\n".join(codes)

@xqute_plugin.impl
def on_jobcmd_prep(self, scheduler: Scheduler, job: Job):
"""When the job command is about to be run"""
codes = plugin.hooks.on_jobcmd_prep(job)
if not codes:
return None
return "\n\n".join(codes)

@xqute_plugin.impl
def on_jobcmd_end(self, scheduler: Scheduler, job: Job):
"""When the job command finishes"""
codes = plugin.hooks.on_jobcmd_end(job)
if not codes:
return None
return "\n\n".join(codes)


xqute_plugin.register(XqutePipenPlugin)
34 changes: 34 additions & 0 deletions tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,37 @@ class IOProc4(IOProc3):
# # assert pipen.run()
assert test_outdir.joinpath("out.txt").exists()
assert test_outdir.joinpath("out.txt").read_text() == "abcd"


@pytest.mark.forked
def test_jobcmd_hooks(pipen):

@plugin.register
class MyJobCmdPlugin:
@plugin.impl
def on_jobcmd_init(job):
return "# on_jobcmd_init from myjobcmdplugin"

@plugin.impl
def on_jobcmd_prep(job):
return "# on_jobcmd_prep from myjobcmdplugin"

@plugin.impl
def on_jobcmd_end(job):
return "# on_jobcmd_end from myjobcmdplugin"

class MyProc(Proc):
input = "in:var"
input_data = [1]
output = "out:var:{{in.in}}"
script = "echo {{proc.name}}"

pipen.set_starts(MyProc).run()
assert pipen.run()

wrapper_script = pipen.workdir / "MyProc" / "0" / "job.wrapped.local"
assert wrapper_script.exists()
content = wrapper_script.read_text()
assert "# on_jobcmd_init from myjobcmdplugin" in content
assert "# on_jobcmd_prep from myjobcmdplugin" in content
assert "# on_jobcmd_end from myjobcmdplugin" in content

0 comments on commit db53374

Please sign in to comment.