Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/requirements-old.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Ensure changes to these dependencies are reflected in pyproject.toml
stepup==3.1.3
path==16.14.0
stepup==3.2.0
23 changes: 23 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,29 @@ and this project adheres to [Effort-based Versioning](https://jacobtomlinson.dev

(no changes yet)

### Added

- New `stepup removejobs` command to remove job directories,
by default only of failed jobs.
This command uses the same safeguards as `stepup clean`
in the upcoming StepUp Core 3.2 release, i.e.,
it only performs destructive actions when explicitly confirmed by the user
with the `--commit` flag.
- Detect unsupported scheduler directives in job scripts
(e.g., PBS, LSF, Cobalt) and raise an error.

### Changed

- Refactored `stepup canceljobs` to use the same safeguards as `stepup clean`
in the upcoming StepUp Core 3.2 release.

### Fixed

- Corrected missing dependency and inconsistency with `.github/requirements-old.txt`.
- Filter jobs by status in `stepup canceljobs`,
so it only cancels jobs that are not done, unless the `--all` flag is used.
- Fixed mistake in regular expressions to detect forbidden `#SBATCH` options.

## [1.0.7][] - 2025-12-07 {: #v1.0.7 }

Improved robustness for workflows with many concurrent jobs.
Expand Down
69 changes: 55 additions & 14 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,25 +108,66 @@ and runs `stepup shutdown` twice in that directory via `ssh`.
The way the software environment is activated may differ from your setup.

A less sophisticated approach is to simply cancel the StepUp job via `scancel <jobid>`.
This will stop StepUp immediately but it may not have the chance to write its state to disk.
This will stop StepUp immediately, but it may not have the chance to write its state to disk.
Normally, this should be fine, as it uses SQLite, which is robust against crashes.

## Killing Running Jobs

If you need to cancel all running SLURM jobs, typically after interrupting StepUp,
you can run the following command from the top-level directory of the workflow:
StepUp Queue will not cancel any jobs when the workflow is interrupted.
It is quite common for a workflow to be interrupted by accident or for technical reasons.
In this case, it would be inefficient to also cancel running jobs, which may still be doing useful work.
Instead, jobs continue to run, and you can restart the StepUp workflow to pick up where it left off.

If you want to cancel running SLURM jobs, typically after interrupting StepUp,
you can run the following command:

```bash
stepup canceljobs
stepup canceljobs dir/to/running/jobs
```

StepUp Queue will not automatically cancel jobs when the workflow is interrupted.
It is quite common for a workflow to be interrupted by accident or for technical reasons.
In this case, it would be inefficient to also cancel running jobs, which may still be doing useful work.
Instead, jobs continue to run and you can restart the StepUp workflow to pick up where it left off.
This command will recursively look for all `slurmjob.log` files in the given paths
(or the current directory if no paths are given),
extract the corresponding job IDs of running jobs, and generate `scancel` commands to cancel them.
After each `scancel` command, the path of the `slurmjob.log` file
and the job status are added as a comment.
For example, the output may look like this:

```bash
scancel 123456 # path/to/job1/slurmjob.log RUNNING
scancel -M somecluster 123457 # path/to/job2/slurmjob.log PENDING
```

By default, this command will not perform any destructive actions
and will only print the `scancel` commands that would be executed.
You can pass the `--commit` option to actually execute the `scancel` commands.
Alternatively, you can select a subset of jobs to cancel with `grep`, for example:

```bash
stepup canceljobs dir/to/running/jobs | grep "filename_pattern"
```

Make sure you always check the generated `scancel` commands before executing them.

## Removing Directories of Cancelled or Failed Jobs

After a job was cancelled or has failed,
the corresponding files are not removed automatically.
This is to allow for inspection of the job's output and error files for debugging purposes.
You can remove the directories of cancelled or failed jobs
by running the following command:

```bash
stepup removejobs dir/to/jobs
```

This command will recursively look for all `slurmjob.log` files in the given paths
(or the current directory if no paths are given),
check the status of the corresponding jobs,
and remove the directories of jobs that have ended but were not successful.

After having cancelled jobs, it is still your responsibility to clean up files in the workflow.
Removing them is not always desirable, so this is not done automatically.
By default, this command will not perform any destructive actions
and will only print the remove commands that would be executed.
You can pass the `--commit` option to actually remove the directories.

## Useful Settings when Developing Workflows

Expand Down Expand Up @@ -158,7 +199,7 @@ wait_for_file() {
# Waiting function to deal with file synchronization issues.
local file="$1"
local timeout="${2:-600}" # timeout in seconds (default 10 min)
local interval="${5:-2}" # poll interval in seconds (default 5 sec)
local interval="${3:-5}" # poll interval in seconds (default 5 sec)

local elapsed=0

Expand Down Expand Up @@ -194,7 +235,7 @@ its submission time, job ID, and previous states.

The status of the jobs is inferred from `sacct -o 'jobidraw,state' -PXn`,
if relevant with a `--cluster` argument.
In addition a configurable `-S` argument is passed to `sacct`.
In addition, a configurable `-S` argument is passed to `sacct`.
Its output is cached in a subdirectory `.stepup/queue` of the workflow root.
The cached result is reused by all `sbatch` actions,
so the number of `sacct` calls is independent of the
Expand All @@ -205,8 +246,8 @@ The time between two `sacct` calls (per cluster) can be controlled with the
Increase this value if you want to reduce the burden on SLURM.

The cached output of `sacct` is checked by the `sbatch` actions with a randomized polling interval.
If any of these actions needs notices that the cached file is too old,
it will aquire a lock on the cache file and update it by calling `sacct`.
If any of these actions notices that the cached file is too old,
it will acquire a lock on the cache file and update it by calling `sacct`.
The randomization guarantees that concurrent calls to `sacct` (for multiple clusters)
will not all coincide.
The polling time can be controlled with two additional environment variables:
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ classifiers = [
]
dependencies = [
# Ensure changes to these dependencies are reflected in .github/requirements-old.txt
"stepup>=3.1.4,<4.0.0",
"path>=16.14.0",
"stepup>=3.2.0,<4.0.0",
]
dynamic = ["version"]

Expand Down Expand Up @@ -56,6 +57,7 @@ sbatch = "stepup.queue.actions:sbatch"

[project.entry-points."stepup.tools"]
canceljobs = "stepup.queue.canceljobs:canceljobs_subcommand"
removejobs = "stepup.queue.removejobs:removejobs_subcommand"

[tool.pytest.ini_options]
addopts = "-n auto --dist worksteal -W error -W ignore::ResourceWarning"
Expand Down
4 changes: 2 additions & 2 deletions stepup/queue/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from stepup.core.worker import WorkThread

from .canceljobs import read_jobid_cluster
from .canceljobs import read_jobid_cluster_status
from .sbatch import InpDigestError, submit_once_and_wait


Expand All @@ -48,7 +48,7 @@ def sbatch(argstr: str, work_thread: WorkThread) -> int:
return submit_once_and_wait(work_thread, args.ext, args.rc)
# Cancel running job (if any), clean log and resubmit
path_log = Path("slurmjob.log")
job_id, cluster = read_jobid_cluster(path_log)
job_id, cluster, _ = read_jobid_cluster_status(path_log)
if cluster is None:
work_thread.runsh(f"scancel {job_id}")
else:
Expand Down
110 changes: 62 additions & 48 deletions stepup/queue/canceljobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,67 +25,66 @@

from path import Path

from .sbatch import FIRST_LINE, parse_sbatch
from .sbatch import DONE_STATES, parse_sbatch, read_log, read_status
from .utils import search_jobs


def canceljobs_tool(args: argparse.Namespace):
if len(args.paths) == 0:
args.paths = [Path(".")]

# Iterate over all slurmjob.log files in the specified directories, and kill them.
job_ids = {}
for path in args.paths:
if not path.exists():
print(f"Path {path} does not exist.")
continue
if not path.is_dir():
print(f"Path {path} is not a directory.")
"""Iterate over all slurmjob.log files, read the SLURM job IDs, and cancel them."""
jobs = {}
for path_log in search_jobs(args.paths, verbose=True):
try:
job_id, cluster, status = read_jobid_cluster_status(path_log)
except ValueError as e:
print(f"# WARNING: Could not read job ID from {path_log}: {e}")
continue
print(f"Searching recursively in {path}")
paths_log = list(path.glob("**/slurmjob.log"))
if (path / "slurmjob.log").is_file():
paths_log.append(path / "slurmjob.log")
for job_log in paths_log:
try:
job_id, cluster = read_jobid_cluster(job_log)
msg = f"Found job {job_id} in {job_log}"
if cluster is not None:
msg += f" on cluster {cluster}"
print(msg)
job_ids.setdefault(cluster, []).append(job_id)
except ValueError as e:
print(f"Warning: Could not read job ID from {job_log}: {e}")
continue
if args.all or status not in DONE_STATES:
jobs.setdefault(cluster, []).append((job_id, path_log, status))

all_good = True
# Cancel at most 100 at a time to avoid exceeding the command line length limit,
# and to play nice with SLURM.
for cluster, cluster_job_ids in job_ids.items():
while len(cluster_job_ids) > 0:
cancel_ids = cluster_job_ids[:100]
cluster_job_ids[:] = cluster_job_ids[100:]
for cluster, cluster_jobs in jobs.items():
if args.commit:
# Cancel at most 100 at a time to avoid exceeding the command line length limit,
# and to play nice with SLURM.
while len(cluster_jobs) > 0:
cancel_jobs = cluster_jobs[:100]
cluster_jobs[:] = cluster_jobs[100:]

command_args = ["scancel"]
if cluster is not None:
command_args.extend(["-M", cluster])
command_args.extend(str(job_id) for job_id in cancel_ids)
command_args = ["scancel"]
if cluster is not None:
command_args.extend(["-M", cluster])
command_args.extend(str(job_id) for job_id, _, _ in cancel_jobs)

# Using subprocess.run for better control and error handling
print(f"Executing: {' '.join(command_args)}")
result = subprocess.run(command_args, check=False)
all_good &= result.returncode == 0
# Using subprocess.run for better control and error handling
print(" ".join(command_args))
result = subprocess.run(command_args, check=False)
all_good &= result.returncode == 0
else:
for job_id, path_log, status in cluster_jobs:
command = "scancel"
if cluster is not None:
command += f" -M {cluster}"
command += f" {job_id} # {path_log} {status}"
print(command)
if not all_good:
print("Some jobs could not be cancelled. See messages above.")
sys.exit(1)


def read_jobid_cluster(job_log: Path) -> tuple[str, str]:
"""Read the job ID and cluster from the job log file."""
with open(job_log) as f:
lines = f.readlines()
if len(lines) < 3 or lines[0][:-1] != FIRST_LINE:
raise ValueError(f"Invalid first line in {job_log}.")
return parse_sbatch(lines[2].split()[-1])
def read_jobid_cluster_status(path_log: str) -> tuple[int, str | None, str | None]:
"""Read the job ID, cluster, and job status from the job log file."""
lines = read_log(path_log, False)
if len(lines) < 1:
raise ValueError(f"Incomplete file: {path_log}.")
words = lines[0].split()
if len(words) != 3:
raise ValueError(f"Could not read job ID from first status line: {lines[0]}")
_, status, job_id_cluster = words
if status != "Submitted":
raise ValueError(f"No 'Submitted' on first status line: {lines[0]}")
job_id, cluster = parse_sbatch(job_id_cluster)
status = read_status(lines[-1:])[1]
return job_id, cluster, status


def canceljobs_subcommand(subparser: argparse.ArgumentParser) -> callable:
Expand All @@ -96,8 +95,23 @@ def canceljobs_subcommand(subparser: argparse.ArgumentParser) -> callable:
parser.add_argument(
"paths",
nargs="*",
default=[Path(".")],
type=Path,
help="Paths to the jobs to cancel. Subdirectories are searched recursively. "
"If not specified, the current directory is used.",
)
parser.add_argument(
"-c",
"--commit",
action="store_true",
default=False,
help="Execute the cancellation of jobs instead of only showing what would be done.",
)
parser.add_argument(
"-a",
"--all",
action="store_true",
default=False,
help="Select all jobs, including the ones that seem to be done already.",
)
return canceljobs_tool
Loading