Skip to content

Commit

Permalink
0.8.0 (#160)
Browse files Browse the repository at this point in the history
* 🎨 Don't slugify pipen or proc names anymore but require them to be valid filenames

* πŸ› Fix process names being reused

* πŸ“ Update documentation with new job caching callback.

* 🎨 Move actions to on_job_cached hook for cached jobs

* ⬆️ Drop support for python3.7

* πŸ”– 0.8.0
  • Loading branch information
pwwang authored Apr 11, 2023
1 parent d101acf commit db91de3
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 192 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ jobs:
strategy:
matrix:
# 3.11 is slow to install
python-version: [3.7, 3.8, 3.9, "3.10"]
# python-version: [3.7, 3.8, 3.9, "3.10", "3.11"]
python-version: [3.8, 3.9, "3.10"]
# python-version: [3.8, 3.9, "3.10", "3.11"]

steps:
- uses: actions/checkout@v3
Expand Down
8 changes: 8 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Change Log

## 0.8.0

- ⬆️ Drop support for python3.7
- 🎨 Don't slugify pipen or proc names anymore but require them to be valid filenames
- πŸ› Fix process names being reused
- πŸ“ Update documentation with new job caching callback.
- 🎨 Move actions to on_job_cached hook for cached jobs

## 0.7.3

- ✨ Add `--list` for `pipen profile` to list the names of available profiles
Expand Down
4 changes: 4 additions & 0 deletions docs/plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ See [`simplug`][1] for more details.

When a job completes successfully

- `on_job_cached(proc, job)` (async)

When a job is cached

- `on_job_failed(proc, job)` (async)

When a job is done but failed (i.e. return_code == 1).
Expand Down
1 change: 0 additions & 1 deletion pipen/cli/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def exec_command(self, args: Namespace) -> None:
for pkg in (
"liquidpy",
"pandas",
"python-slugify",
"enlighten",
"argx",
"xqute",
Expand Down
2 changes: 1 addition & 1 deletion pipen/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ class ConfigurationError(PipenException):
"""When something wrong set as configuration"""


class ProcWorkdirConflictException(PipenException):
class PipenOrProcNameError(PipenException):
""" "When more than one processes are sharing the same workdir"""
23 changes: 19 additions & 4 deletions pipen/pipen.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
from rich.text import Text
from rich.console import Group
from simpleconf import ProfileConfig
from slugify import slugify # type: ignore
from varname import varname, VarnameException

from .defaults import CONFIG, CONFIG_FILES
from .exceptions import ProcDependencyError, PipenSetDataError
from .exceptions import (
PipenOrProcNameError,
ProcDependencyError,
PipenSetDataError,
)
from .pluginmgr import plugin
from .proc import Proc
from .progressbar import PipelinePBar
Expand All @@ -29,6 +32,7 @@
desc_from_docstring,
get_logpanel_width,
get_plugin_context,
is_valid_name,
log_rich_renderable,
logger,
pipen_banner,
Expand Down Expand Up @@ -92,6 +96,12 @@ def __init__(
except VarnameException:
self.name = f"pipen-{self.__class__.PIPELINE_COUNT}"

if not is_valid_name(self.name):
raise PipenOrProcNameError(
f"Invalid pipeline name: {self.name}, "
r"expecting '^[\w.-]$'"
)

self.desc = (
desc
or self.__class__.desc
Expand All @@ -100,7 +110,7 @@ def __init__(
self.outdir = Path(
outdir
or self.__class__.outdir
or f"./{slugify(self.name)}_results"
or f"./{self.name}_results"
).resolve()
self.workdir: Path = None
self.profile: str = "default"
Expand Down Expand Up @@ -164,7 +174,7 @@ async def async_run(self, profile: str = "default") -> bool:
True if the pipeline ends successfully else False
"""
self.profile = profile
self.workdir = Path(self.config.workdir) / slugify(self.name)
self.workdir = Path(self.config.workdir) / self.name
self.workdir.mkdir(parents=True, exist_ok=True)

succeeded = True
Expand Down Expand Up @@ -457,6 +467,11 @@ def build_proc_relationships(self) -> None:
f"Cyclic dependency: {proc.name}"
)

if proc.name in [p.name for p in self.procs]:
raise PipenOrProcNameError(
f"'{proc.name}' is already used by another process."
)

# Add proc to self.procs if all their requires
# are added to self.procs
# Then remove proc from nexts
Expand Down
11 changes: 11 additions & 0 deletions pipen/pluginmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ class PipenMainPlugin:
cache the job"""

name = "main"
# The priority is set to -1000 to make sure it is the first plugin
# to be called
order = -1000

@plugin.impl
def on_proc_shutdown(self, proc: Proc, sig: signal.Signals):
Expand All @@ -260,6 +263,14 @@ async def on_job_running(self, proc: Proc, job: Job):
"""Update the progress bar when a job starts to run"""
proc.pbar.update_job_running()

@plugin.impl
async def on_job_cached(self, proc: Proc, job: Job):
"""Update the progress bar when a job is cached"""
proc.pbar.update_job_submitted()
proc.pbar.update_job_running()
proc.pbar.update_job_succeeded()
job.status = JobStatus.FINISHED

@plugin.impl
async def on_job_succeeded(self, proc: Proc, job: Job):
"""Cache the job and update the progress bar when a job is succeeded"""
Expand Down
33 changes: 9 additions & 24 deletions pipen/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from diot import Diot
from rich import box
from rich.panel import Panel
from slugify import slugify # type: ignore
from varname import VarnameException, varname
from xqute import JobStatus, Xqute

Expand All @@ -31,7 +30,7 @@
ProcInputKeyError,
ProcInputTypeError,
ProcScriptFileNotFound,
ProcWorkdirConflictException,
PipenOrProcNameError,
)
from .pluginmgr import plugin
from .scheduler import get_scheduler
Expand All @@ -44,6 +43,7 @@
get_logpanel_width,
ignore_firstline_dedent,
is_subclass,
is_valid_name,
log_rich_renderable,
logger,
make_df_colnames_unique_inplace,
Expand Down Expand Up @@ -294,6 +294,12 @@ def __init_subclass__(cls) -> None:
if cls.name is None or (parent and cls.name == parent.name):
cls.name = cls.__name__

if not is_valid_name(cls.name):
raise PipenOrProcNameError(
f"{cls.name} is not a valid process name, expecting "
r"'^[\w.-]+$'"
)

cls.envs = update_dict(parent.envs if parent else None, cls.envs)
cls.plugin_opts = update_dict(
parent.plugin_opts if parent else None,
Expand All @@ -319,10 +325,7 @@ def __init__(self, pipeline: Pipen = None) -> None:
self.pbar = None
self.jobs: List[Any] = []
self.xqute = None
self.__class__.workdir = Path(self.pipeline.workdir) / slugify(
self.name
)

self.__class__.workdir = Path(self.pipeline.workdir) / self.name
# plugins can modify some default attributes
plugin.hooks.on_proc_init(self)

Expand Down Expand Up @@ -360,21 +363,7 @@ def __init__(self, pipeline: Pipen = None) -> None:
)
# script
self.script = self._compute_script() # type: ignore

# check if it's the same proc using the workdir
# since the directory name is slugified
proc_name_file = self.workdir / "proc.name" # type: ignore
if (
proc_name_file.is_file()
and proc_name_file.read_text() != self.name
):
raise ProcWorkdirConflictException(
"Workdir name is conflicting with process "
f"{proc_name_file.read_text()!r}, use a differnt pipeline "
"workdir or a different process name."
)
self.workdir.mkdir(exist_ok=True)
proc_name_file.write_text(self.name)

if self.submission_batch is None:
self.submission_batch = self.pipeline.config.submission_batch
Expand Down Expand Up @@ -455,10 +444,6 @@ async def run(self) -> None:
for job in self.jobs:
if await job.cached:
cached_jobs.append(job.index)
self.pbar.update_job_submitted()
self.pbar.update_job_running()
self.pbar.update_job_succeeded()
job.status = JobStatus.FINISHED
await plugin.hooks.on_job_cached(self, job)
else:
await self.xqute.put(job)
Expand Down
13 changes: 13 additions & 0 deletions pipen/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Provide some utilities"""
from __future__ import annotations

import re
import sys
import logging
import textwrap
Expand Down Expand Up @@ -558,3 +559,15 @@ def get_marked(proc: Type[Proc], mark_name: str, default: Any = None) -> Any:
The marked value
"""
return proc.__meta__.get(mark_name, default)


def is_valid_name(name: str) -> bool:
"""Check if a name is valid for a proc or pipen
Args:
name: The name to check
Returns:
True if valid, otherwise False
"""
return re.match(r"^[\w.-]+$", name) is not None
2 changes: 1 addition & 1 deletion pipen/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Provide version of pipen"""

__version__ = "0.7.3"
__version__ = "0.8.0"
Loading

0 comments on commit db91de3

Please sign in to comment.