Skip to content

Commit

Permalink
Add dry mode and improve option consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
frthjf committed Apr 6, 2024
1 parent f0cf0ef commit 17ba254
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 59 deletions.
128 changes: 72 additions & 56 deletions docs/examples/mpi-execution/mpi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Literal, Optional, Union

import shutil
import sys

from machinable import Execution
Expand All @@ -15,7 +14,9 @@ class Config(BaseModel):

preamble: Optional[str] = ""
mpi: Optional[str] = "mpirun"
python: Optional[str] = None
resume_failed: Union[bool, Literal["new", "skip"]] = False
dry: bool = False

def on_compute_default_resources(self, executable):
resources = {}
Expand All @@ -40,69 +41,84 @@ def __call__(self):
elif self.config.resume_failed == "skip":
continue
else:
raise ExecutionFailed(
f"{executable.module} <{executable.id})> has previously been executed unsuccessfully. Set `resume_failed` to True, 'new' or 'skip' to handle resubmission."
)
msg = f"{executable.module} <{executable.id})> has previously been executed unsuccessfully. Set `resume_failed` to True, 'new' or 'skip' to handle resubmission."
if self.config.dry:
print("Dry run ... ", msg)
continue

raise ExecutionFailed(msg)

resources = self.computed_resources(executable)
mpi = executable.config.get("mpi", self.config.mpi)
python = self.config.python or sys.executable

if mpi is None:
executable.dispatch()
else:
script = "#!/usr/bin/env bash\n"

if self.config.preamble:
script += self.config.preamble

script += executable.dispatch_code()
script = "#!/usr/bin/env bash\n"

script_file = chmodx(
self.save_file(
[executable.id, "mpi.sh"],
script,
)
)
if self.config.preamble:
script += self.config.preamble

cmd = [shutil.which(self.config.mpi)]
for k, v in resources.items():
if v not in [None, True]:
if k.startswith("--"):
cmd.append(f"{k}={v}")
else:
cmd.extend([k, str(v)])
else:
cmd.append(k)
cmd.append(script_file)
# add debug information
script += "\n"
script += f"# {executable.module} <{executable.id}>\n"
script += f"# {executable.local_directory()}>\n"
script += "\n"

print(" ".join(cmd))
script += executable.dispatch_code(python=python)

script_file = chmodx(
self.save_file(
[executable.id, "mpi.json"],
data={
"cmd": cmd,
"script": script,
},
[executable.id, "mpi.sh"],
script,
)
)

with open(
self.local_directory(executable.id, "output.log"),
"w",
buffering=1,
) as f:
try:
run_and_stream(
cmd,
stdout_handler=lambda o: [
sys.stdout.write(o),
f.write(o),
],
stderr_handler=lambda o: [
sys.stderr.write(o),
f.write(o),
],
)
except KeyboardInterrupt as _ex:
raise KeyboardInterrupt(
"Interrupting `" + " ".join(cmd) + "`"
) from _ex
if mpi is None:
cmd = []
else:
cmd = [mpi]
for k, v in resources.items():
if v is None or v is True:
cmd.append(k)
else:
if k.startswith("--"):
cmd.append(f"{k}={v}")
else:
cmd.extend([k, str(v)])

cmd.append(script_file)

print(" ".join(cmd))

self.save_file(
[executable.id, "mpi.json"],
data={
"cmd": cmd,
"script": script,
},
)

if self.config.dry:
print("Dry run ... ", executable)
continue

with open(
self.local_directory(executable.id, "output.log"),
"w",
buffering=1,
) as f:
try:
run_and_stream(
cmd,
stdout_handler=lambda o: [
sys.stdout.write(o),
f.write(o),
],
stderr_handler=lambda o: [
sys.stderr.write(o),
f.write(o),
],
)
except KeyboardInterrupt as _ex:
raise KeyboardInterrupt(
"Interrupting `" + " ".join(cmd) + "`"
) from _ex
32 changes: 29 additions & 3 deletions docs/examples/slurm-execution/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class Config(BaseModel):
confirm: bool = True
copy_project_source: bool = True
resume_failed: Union[bool, Literal["new", "skip"]] = False
dry: bool = False

def on_before_dispatch(self):
if self.config.confirm:
Expand Down Expand Up @@ -148,7 +149,7 @@ def __call__(self):
)

source_code = Project.get().path()
if self.config.copy_project_source:
if self.config.copy_project_source and not self.config.dry:
print("Copy project source code ...")
source_code = self.local_directory(executable.id, "source_code")
cmd = ["rsync", "-a", Project.get().path(""), source_code]
Expand All @@ -159,7 +160,13 @@ def __call__(self):

resources = self.computed_resources(executable)
mpi = executable.config.get("mpi", self.config.mpi)
mpi_args = executable.config.get("mpi_args", self.config.mpi_args)
mpi_args = self.config.mpi_args
ranks = executable.config.get("ranks", None)
if ranks is not None:
if mpi_args:
mpi_args = mpi_args.replace("{ranks}", str(ranks))
else:
mpi_args = "-n " + str(executable.config.get("ranks"))
python = self.config.python or sys.executable

# usage dependencies
Expand Down Expand Up @@ -199,6 +206,12 @@ def __call__(self):
if self.config.preamble:
script += self.config.preamble

# add debug information
script += "\n"
script += f"# {executable.module} <{executable.id}>\n"
script += f"# {executable.local_directory()}>\n"
script += "\n"

if mpi:
if mpi[-1] != " ":
mpi += " "
Expand All @@ -222,6 +235,19 @@ def __call__(self):
cmd = ["sbatch", script_file]
print(" ".join(cmd))

self.save_file(
[executable.id, "slurm.json"],
data={
"job_id": None,
"cmd": sbatch_arguments,
"script": script,
},
)

if self.config.dry:
print("Dry run ... ", executable)
continue

try:
output = subprocess.run(
cmd,
Expand All @@ -243,7 +269,7 @@ def __call__(self):
f"{job_id} named `{resources['--job-name']}` for {executable.local_directory()} (output at {resources['--output']})"
)

# save job information
# update job information
jobs[executable.id] = job_id
self.save_file(
[executable.id, "slurm.json"],
Expand Down

0 comments on commit 17ba254

Please sign in to comment.