diff --git a/docs/examples/mpi-execution/mpi.py b/docs/examples/mpi-execution/mpi.py index 81cdf57d..5729533b 100644 --- a/docs/examples/mpi-execution/mpi.py +++ b/docs/examples/mpi-execution/mpi.py @@ -1,6 +1,5 @@ from typing import Literal, Optional, Union -import shutil import sys from machinable import Execution @@ -15,7 +14,9 @@ class Config(BaseModel): preamble: Optional[str] = "" mpi: Optional[str] = "mpirun" + python: Optional[str] = None resume_failed: Union[bool, Literal["new", "skip"]] = False + dry: bool = False def on_compute_default_resources(self, executable): resources = {} @@ -40,69 +41,84 @@ def __call__(self): elif self.config.resume_failed == "skip": continue else: - raise ExecutionFailed( - f"{executable.module} <{executable.id})> has previously been executed unsuccessfully. Set `resume_failed` to True, 'new' or 'skip' to handle resubmission." - ) + msg = f"{executable.module} <{executable.id})> has previously been executed unsuccessfully. Set `resume_failed` to True, 'new' or 'skip' to handle resubmission." + if self.config.dry: + print("Dry run ... ", msg) + continue + + raise ExecutionFailed(msg) resources = self.computed_resources(executable) mpi = executable.config.get("mpi", self.config.mpi) + python = self.config.python or sys.executable - if mpi is None: - executable.dispatch() - else: - script = "#!/usr/bin/env bash\n" - - if self.config.preamble: - script += self.config.preamble - - script += executable.dispatch_code() + script = "#!/usr/bin/env bash\n" - script_file = chmodx( - self.save_file( - [executable.id, "mpi.sh"], - script, - ) - ) + if self.config.preamble: + script += self.config.preamble - cmd = [shutil.which(self.config.mpi)] - for k, v in resources.items(): - if v not in [None, True]: - if k.startswith("--"): - cmd.append(f"{k}={v}") - else: - cmd.extend([k, str(v)]) - else: - cmd.append(k) - cmd.append(script_file) + # add debug information + script += "\n" + script += f"# {executable.module} <{executable.id}>\n" + script += f"# {executable.local_directory()}>\n" + script += "\n" - print(" ".join(cmd)) + script += executable.dispatch_code(python=python) + script_file = chmodx( self.save_file( - [executable.id, "mpi.json"], - data={ - "cmd": cmd, - "script": script, - }, + [executable.id, "mpi.sh"], + script, ) + ) - with open( - self.local_directory(executable.id, "output.log"), - "w", - buffering=1, - ) as f: - try: - run_and_stream( - cmd, - stdout_handler=lambda o: [ - sys.stdout.write(o), - f.write(o), - ], - stderr_handler=lambda o: [ - sys.stderr.write(o), - f.write(o), - ], - ) - except KeyboardInterrupt as _ex: - raise KeyboardInterrupt( - "Interrupting `" + " ".join(cmd) + "`" - ) from _ex + if mpi is None: + cmd = [] + else: + cmd = [mpi] + for k, v in resources.items(): + if v is None or v is True: + cmd.append(k) + else: + if k.startswith("--"): + cmd.append(f"{k}={v}") + else: + cmd.extend([k, str(v)]) + + cmd.append(script_file) + + print(" ".join(cmd)) + + self.save_file( + [executable.id, "mpi.json"], + data={ + "cmd": cmd, + "script": script, + }, + ) + + if self.config.dry: + print("Dry run ... ", executable) + continue + + with open( + self.local_directory(executable.id, "output.log"), + "w", + buffering=1, + ) as f: + try: + run_and_stream( + cmd, + stdout_handler=lambda o: [ + sys.stdout.write(o), + f.write(o), + ], + stderr_handler=lambda o: [ + sys.stderr.write(o), + f.write(o), + ], + ) + except KeyboardInterrupt as _ex: + raise KeyboardInterrupt( + "Interrupting `" + " ".join(cmd) + "`" + ) from _ex diff --git a/docs/examples/slurm-execution/slurm.py b/docs/examples/slurm-execution/slurm.py index 45792f70..738b6bdd 100644 --- a/docs/examples/slurm-execution/slurm.py +++ b/docs/examples/slurm-execution/slurm.py @@ -98,6 +98,7 @@ class Config(BaseModel): confirm: bool = True copy_project_source: bool = True resume_failed: Union[bool, Literal["new", "skip"]] = False + dry: bool = False def on_before_dispatch(self): if self.config.confirm: @@ -148,7 +149,7 @@ def __call__(self): ) source_code = Project.get().path() - if self.config.copy_project_source: + if self.config.copy_project_source and not self.config.dry: print("Copy project source code ...") source_code = self.local_directory(executable.id, "source_code") cmd = ["rsync", "-a", Project.get().path(""), source_code] @@ -159,7 +160,13 @@ def __call__(self): resources = self.computed_resources(executable) mpi = executable.config.get("mpi", self.config.mpi) - mpi_args = executable.config.get("mpi_args", self.config.mpi_args) + mpi_args = self.config.mpi_args + ranks = executable.config.get("ranks", None) + if ranks is not None: + if mpi_args: + mpi_args = mpi_args.replace("{ranks}", str(ranks)) + else: + mpi_args = "-n " + str(executable.config.get("ranks")) python = self.config.python or sys.executable # usage dependencies @@ -199,6 +206,12 @@ def __call__(self): if self.config.preamble: script += self.config.preamble + # add debug information + script += "\n" + script += f"# {executable.module} <{executable.id}>\n" + script += f"# {executable.local_directory()}>\n" + script += "\n" + if mpi: if mpi[-1] != " ": mpi += " " @@ -222,6 +235,19 @@ def __call__(self): cmd = ["sbatch", script_file] print(" ".join(cmd)) + self.save_file( + [executable.id, "slurm.json"], + data={ + "job_id": None, + "cmd": sbatch_arguments, + "script": script, + }, + ) + + if self.config.dry: + print("Dry run ... ", executable) + continue + try: output = subprocess.run( cmd, @@ -243,7 +269,7 @@ def __call__(self): f"{job_id} named `{resources['--job-name']}` for {executable.local_directory()} (output at {resources['--output']})" ) - # save job information + # update job information jobs[executable.id] = job_id self.save_file( [executable.id, "slurm.json"],