Skip to content

Commit

Permalink
0.9.0 (#161)
Browse files Browse the repository at this point in the history
* ⬆️ Bump xqute to 0.2 so we have slurm and ssh schedulers available
🎨 Improve code for dropping python 3.7

* 👷 Use 3.10 as main python version in CI

* ✨ Add ssh and slurm scheduers

* 📝 Update docs for slurm and ssh schedulers

* 📝 Update documentation dependencies in requirements.txt

* 🔖 0.9.0
  • Loading branch information
pwwang authored Apr 13, 2023
1 parent db91de3 commit df99f23
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 200 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
if: ${{ always() }}
- name: Run codacy-coverage-reporter
uses: codacy/codacy-coverage-reporter-action@master
if: matrix.python-version == 3.9
if: matrix.python-version == 3.10
with:
project-token: ${{ secrets.CODACY_PROJECT_TOKEN }}
coverage-reports: .coverage.xml
Expand All @@ -50,7 +50,7 @@ jobs:
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags')
strategy:
matrix:
python-version: [3.9]
python-version: ["3.10"]
steps:
- uses: actions/checkout@v3
- name: Setup Python # Set Python version
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
# if: github.ref == 'refs/heads/master'
strategy:
matrix:
python-version: [3.9]
python-version: ["3.10"]
steps:
- uses: actions/checkout@v3
- name: Setup Python # Set Python version
Expand Down Expand Up @@ -37,7 +37,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.9]
python-version: ["3.10"]
steps:
- uses: actions/checkout@v3
with:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ if __name__ == "__main__":
[09/13/21 04:23:37] I main _ ____/__/ / _ ____/_ /___ _ /| /
[09/13/21 04:23:37] I main /_/ /___/ /_/ /_____/ /_/ |_/
[09/13/21 04:23:37] I main
[09/13/21 04:23:37] I main version: 0.7.0
[09/13/21 04:23:37] I main version: 0.9.0
[09/13/21 04:23:37] I main
[09/13/21 04:23:37] I main ╭═════════════════════════════ MYPIPELIN ═══════════════════════════════╮
[09/13/21 04:23:37] I main ║ # procs = 2 ║
Expand Down
8 changes: 8 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Change Log

## 0.9.0

- ⬆️ Bump xqute to 0.2 so we can have slurm and ssh schedulers available
- ✨ Add ssh and slurm scheduers
- 🎨 Improve code for dropping python 3.7
- 👷 Use 3.10 as main python version in CI
- 📝 Update docs for slurm and ssh schedulers

## 0.8.0

- ⬆️ Drop support for python3.7
Expand Down
9 changes: 3 additions & 6 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# use_directory_urls doesn't work for newer versions
mkdocs==1.1.2
# AttributeError: module 'jinja2' has no attribute 'contextfilter'
# jinja2==3.1.0
jinja2==3.0.3
mkdocs-material==7.2.3
mkdocs
jinja2
mkdocs-material
pymdown-extensions
mkapi-fix
24 changes: 20 additions & 4 deletions docs/scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,50 @@ Send the jobs to run on `sge` scheduler.

The `scheduler_opts` will be the ones supported by `qsub`.

### `slurm`

Send the jobs to run on `slurm` scheduler.

The `scheduler_opts` will be the ones supported by `sbatch`.

### `ssh`

Send the jobs to run on a remote machine via `ssh`.

The `scheduler_opts` will be the ones supported by `ssh`.

See also [xqute][1].

## Writing your own scheduler plugin

To write a scheduler plugin, you need to subclass `xqute.schedulers.scheduler.Scheduler`.

You may also want to implement a class for jobs, by subclassing `xqute.schedulers.job.Job`, and assign it to the class variable `job_class` to your `xqute.schedulers.scheduler.Scheduler` subclass.

For examples of a scheduler plugin, see [local_scheduler][2] and [sge_scheduler][3].
For examples of a scheduler plugin, see [local_scheduler][2], [sge_scheduler][3], [slurm_scheduler][4], and [ssh_scheduler][5].

The `xqute.schedulers.scheduler.Scheduler` subclass can be passed to `scheduler` configuration directly to be used as a scheduler. But you can also register it with entry points:

For `setup.py`, you will need:
```python
setup(
# ...
entry_points={"pipen_sched": ["slurm = pipen_slurm"]},
entry_points={"pipen_sched": ["mysched = pipen_mysched"]},
# ...
)
```

For `pyproject.toml`:
```toml
[tool.poetry.plugins.pipen_sched]
slurm = "pipen_slurm"
mysched = "pipen_mysched"
```

Then you can switch the scheduler to slurm by `scheduler="slurm"`
Then you can switch the scheduler to `mysched` by `scheduler="mysched"`


[1]: https://github.com/pwwang/xqute
[2]: https://github.com/pwwang/xqute/blob/master/xqute/schedulers/local_scheduler.py
[3]: https://github.com/pwwang/xqute/blob/master/xqute/schedulers/sge_scheduler.py
[4]: https://github.com/pwwang/xqute/blob/master/xqute/schedulers/slurm_scheduler.py
[5]: https://github.com/pwwang/xqute/blob/master/xqute/schedulers/ssh_scheduler/
12 changes: 2 additions & 10 deletions pipen/cli/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,6 @@
__all__ = ("CLIVersionPlugin",)


def get_pkg_version(pkg: str) -> str:
try:
from importlib.metadata import version
return version(pkg)
except ImportError: # pragma: no cover
from pkg_resources import get_distribution # type: ignore
return get_distribution(pkg).version


class CLIVersionPlugin(CLIPlugin):
"""Print versions of pipen and its dependencies"""

Expand All @@ -29,6 +20,7 @@ class CLIVersionPlugin(CLIPlugin):
def exec_command(self, args: Namespace) -> None:
"""Run the command"""
import sys
from importlib.metadata import version
from .. import __version__

versions = {"python": sys.version, "pipen": __version__}
Expand All @@ -43,7 +35,7 @@ def exec_command(self, args: Namespace) -> None:
"pipda",
"varname",
):
versions[pkg] = get_pkg_version(pkg)
versions[pkg] = version(pkg)

keylen = max(map(len, versions))
for key in versions:
Expand Down
5 changes: 0 additions & 5 deletions pipen/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@

from diot import Diot
from xqute import JobErrorStrategy
from xqute import logger as xqute_logger

# turn xqute's logger off
xqute_logger.setLevel(100)
xqute_logger.removeHandler(xqute_logger.handlers[0])

LOGGER_NAME = "main"
CONFIG_FILES = (
Expand Down
5 changes: 3 additions & 2 deletions pipen/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

import logging
import shlex
import shutil
from functools import cached_property
from os import PathLike
from pathlib import Path
import shutil
from typing import TYPE_CHECKING, Any, Dict, Mapping

from diot import OrderedDiot
Expand All @@ -22,7 +23,7 @@
TemplateRenderingError,
)
from .template import Template
from .utils import cached_property, logger, strsplit
from .utils import logger, strsplit

if TYPE_CHECKING: # pragma: no cover
from .proc import Proc
Expand Down
2 changes: 1 addition & 1 deletion pipen/pipen.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async def async_run(self, profile: str = "default") -> bool:
"This is a start process, "
"but no 'input_data' specified.",
)
await proc_obj._init()
await proc_obj.init()
await proc_obj.run()
if proc_obj.succeeded:
self.pbar.update_proc_done()
Expand Down
9 changes: 4 additions & 5 deletions pipen/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

import asyncio
import inspect
from abc import ABC, ABCMeta
import logging
from abc import ABC, ABCMeta
from functools import cached_property
from os import PathLike
from pathlib import Path
from typing import (
Expand All @@ -17,8 +18,6 @@
TYPE_CHECKING,
)

# Slow down the import, try do it at runtime
# import pandas
from diot import Diot
from rich import box
from rich.panel import Panel
Expand All @@ -37,7 +36,6 @@
from .template import Template, get_template_engine
from .utils import (
brief_list,
cached_property,
copy_dict,
desc_from_docstring,
get_logpanel_width,
Expand Down Expand Up @@ -368,7 +366,7 @@ def __init__(self, pipeline: Pipen = None) -> None:
if self.submission_batch is None:
self.submission_batch = self.pipeline.config.submission_batch

async def _init(self) -> None:
async def init(self) -> None:
"""Init all other properties and jobs"""
import pandas

Expand All @@ -379,6 +377,7 @@ async def _init(self) -> None:
self.xqute = Xqute(
self.scheduler,
job_metadir=self.workdir,
loglevel="NOTSET",
job_submission_batch=self.submission_batch,
job_error_strategy=self.error_strategy
or self.pipeline.config.error_strategy,
Expand Down
3 changes: 1 addition & 2 deletions pipen/procgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@
from __future__ import annotations

from os import PathLike
from functools import wraps
from functools import wraps, cached_property
from typing import Callable, Type, List
from abc import ABC, ABCMeta
from diot import Diot

from .utils import cached_property
from .pipen import Pipen
from .proc import Proc

Expand Down
61 changes: 44 additions & 17 deletions pipen/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,65 @@
from typing import Type

from xqute import Scheduler
from xqute.schedulers.local_scheduler import LocalJob as XquteLocalJob
from xqute.schedulers.local_scheduler import (
LocalJob as XquteLocalJob,
LocalScheduler as XquteLocalScheduler,
)
from xqute.schedulers.sge_scheduler import SgeJob as XquteSgeJob
from xqute.schedulers.sge_scheduler import SgeScheduler as XquteSgeScheduler
from xqute.schedulers.sge_scheduler import (
SgeJob as XquteSgeJob,
SgeScheduler as XquteSgeScheduler
)
from xqute.schedulers.slurm_scheduler import (
SlurmJob as XquteSlurmJob,
SlurmScheduler as XquteSlurmScheduler,
)
from xqute.schedulers.ssh_scheduler import (
SshJob as XquteSshJob,
SshScheduler as XquteSshScheduler,
)

from .defaults import SCHEDULER_ENTRY_GROUP
from .exceptions import NoSuchSchedulerError, WrongSchedulerTypeError
from .job import Job
from .utils import is_subclass, load_entrypoints


class LocalJob(Job):
class LocalJob(XquteLocalJob, Job):
"""Job class for local scheduler"""

wrap_cmd = XquteLocalJob.wrap_cmd


class LocalScheduler(XquteLocalScheduler):
"""Local scheduler"""

job_class = LocalJob


class SgeJob(Job):
"""Job class for sge scheduler"""

wrap_cmd = XquteSgeJob.wrap_cmd
class SgeJob(XquteSgeJob, Job):
"""Job class for SGE scheduler"""


class SgeScheduler(XquteSgeScheduler):
"""Sge scheduler"""

"""SGE scheduler"""
job_class = SgeJob


class SlurmJob(XquteSlurmJob, Job):
"""Job class for Slurm scheduler"""


class SlurmScheduler(XquteSlurmScheduler):
"""Slurm scheduler"""
job_class = SlurmJob


class SshJob(XquteSshJob, Job):
"""Job class for SSH scheduler"""


class SshScheduler(XquteSshScheduler):
"""SSH scheduler"""
job_class = SshJob


def get_scheduler(scheduler: str | Type[Scheduler]) -> Type[Scheduler]:
"""Get the scheduler by name of the scheduler class itself
Expand All @@ -55,13 +77,18 @@ def get_scheduler(scheduler: str | Type[Scheduler]) -> Type[Scheduler]:

if scheduler == "local":
return LocalScheduler

if scheduler == "sge":
return SgeScheduler

for name, obj in load_entrypoints(
SCHEDULER_ENTRY_GROUP
): # pragma: no cover
if name == scheduler:
if scheduler == "slurm":
return SlurmScheduler

if scheduler == "ssh":
return SshScheduler

for n, obj in load_entrypoints(SCHEDULER_ENTRY_GROUP): # pragma: no cover
if n == scheduler:
if not is_subclass(obj, Scheduler):
raise WrongSchedulerTypeError(
"Scheduler should be a subclass of "
Expand Down
Loading

0 comments on commit df99f23

Please sign in to comment.