diff --git a/.github/threadpool.yaml b/.github/threadpool.yaml deleted file mode 100644 index 97404fad..00000000 --- a/.github/threadpool.yaml +++ /dev/null @@ -1,29 +0,0 @@ ---- -parsl_log_level: WARNING -retries: 0 -ModelEvaluation: - max_simulation_time: 0.4 - gpu: false - use_threadpool: true -ModelTraining: - max_training_time: 1 - gpu: true - use_threadpool: true - max_workers: 1 -CP2K: - cores_per_worker: 2 - max_evaluation_time: 0.3 - launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/cp2k:2024.1 /opt/entry.sh mpirun -bind-to core -np 2 -env OMP_NUM_THREADS 1 cp2k.psmp' -CP2K_container: - cores_per_worker: 2 - max_evaluation_time: 0.3 - launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/cp2k:2024.1 /opt/entry.sh mpirun -bind-to core -np 2 -env OMP_NUM_THREADS 1 cp2k.psmp' -GPAW: - cores_per_worker: 2 - max_evaluation_time: 0.3 - launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/gpaw:24.1 /opt/entry.sh mpirun -np 2 gpaw python /opt/run_gpaw.py' -GPAW_container: - cores_per_worker: 2 - max_evaluation_time: 0.3 - launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/gpaw:24.1 /opt/entry.sh mpirun -np 2 gpaw python /opt/run_gpaw.py' -... diff --git a/.github/workflows/run_pytest.yaml b/.github/workflows/run_pytest.yaml index d83b27e1..b835eacc 100644 --- a/.github/workflows/run_pytest.yaml +++ b/.github/workflows/run_pytest.yaml @@ -48,4 +48,4 @@ jobs: run: | pip install .[dev] pip list - pytest --skip-gpu --psiflow-config=.github/threadpool.yaml + pytest --skip-gpu diff --git a/configs/local_test.yaml b/configs/local_test.yaml new file mode 100644 index 00000000..3f9dc2bf --- /dev/null +++ b/configs/local_test.yaml @@ -0,0 +1,32 @@ +--- +parsl_log_level: WARNING +retries: 0 +make_symlinks: false + +ModelEvaluation: + gpu: false + use_threadpool: false + max_simulation_time: 1 + +ModelTraining: + gpu: true + use_threadpool: true + max_training_time: 1 + max_workers: 1 # suppress assertion for multigpu training + +CP2K: + cores_per_worker: 1 + max_evaluation_time: 0.1 + container_uri: 'oras://ghcr.io/molmod/cp2k:2024.1' + +GPAW: + cores_per_worker: 1 + max_evaluation_time: 0.1 + container_uri: 'oras://ghcr.io/molmod/gpaw:24.1' + +ORCA: + cores_per_worker: 1 + max_evaluation_time: 0.1 + + +... diff --git a/configs/hortense.yaml b/configs/old_hortense.yaml similarity index 100% rename from configs/hortense.yaml rename to configs/old_hortense.yaml diff --git a/configs/lumi.yaml b/configs/old_lumi.yaml similarity index 100% rename from configs/lumi.yaml rename to configs/old_lumi.yaml diff --git a/configs/threadpool.yaml b/configs/old_threadpool.yaml similarity index 100% rename from configs/threadpool.yaml rename to configs/old_threadpool.yaml diff --git a/configs/wq.yaml b/configs/old_wq.yaml similarity index 100% rename from configs/wq.yaml rename to configs/old_wq.yaml diff --git a/psiflow/__init__.py b/psiflow/__init__.py index fbcaec46..10816e2c 100644 --- a/psiflow/__init__.py +++ b/psiflow/__init__.py @@ -33,3 +33,42 @@ def resolve_and_check(path: Path) -> Path: load = ExecutionContextLoader.load context = ExecutionContextLoader.context wait = ExecutionContextLoader.wait + + +# TODO: EXECUTION +# - max_runtime is in seconds, max_simulation_time (and others) is in minutes +# - ExecutionDefinition gpu argument? +# - ExecutionDefinition wrap_in_timeout functionality +# - ExecutionDefinition wrap_in_srun functionality? Actually for MD this is more iffy right +# - ExecutionDefinition why properties? +# - ExecutionDefinition centralize wq_resources +# - configuration file with all options +# - timeout -s 9 or -s 15? +# - executor keys are hardcoded strings.. +# - define a 'format_env_variables' util +# - update GPAW + ORCA + Default containers +# include s-dftd3 in modelevaluation + install s-dftd3 with openmp +# - what with mem_per_node / mem_per_worker +# - always GPU for training? +# - currently reference mpi_args have to be tuple according to typeguard.. +# - cores_per_block has to be specified even when exclusive..? +# - can we do something with WQ priority? +# - see chatgpt convo for process memory limits and such +# - make /tmp for app workdirs an option? +# - what is scaling_cores_per_worker in WQ +# - can we clean up psiflow_internal slightly? +# - +# TODO: REFERENCE +# - reference MPI args not really checked +# - mpi flags are very finicky across implementations --> use ENV args? +# OMPI_MCA_orte_report_bindings=1 I_MPI_DEBUG=4 +# - commands ends with 'exit 0' - what if we do not want to exit yet? +# - some actual logging? +# - safe_compute_dataset functionality? +# - ReferenceDummy is a bit sloppy +# - +# TODO: MISC +# - think about test efficiency +# - some imports take a very long time +# - fix serialisation +# - diff --git a/psiflow/data/dataset.py b/psiflow/data/dataset.py index d0fa168c..a2e866e9 100644 --- a/psiflow/data/dataset.py +++ b/psiflow/data/dataset.py @@ -284,6 +284,14 @@ def evaluate( Returns: Dataset: A new Dataset with evaluation results. """ + # TODO: WIP + from psiflow.hamiltonians import Hamiltonian + + if not isinstance(computable, Hamiltonian): + # avoid extracting and inserting the same quantities + return computable.compute_dataset(self) + + # use Hamiltonian.compute method if batch_size is not None: outputs = computable.compute(self, batch_size=batch_size) else: diff --git a/psiflow/execution.py b/psiflow/execution.py index be733cb2..9653031d 100644 --- a/psiflow/execution.py +++ b/psiflow/execution.py @@ -5,11 +5,12 @@ import re import shutil import sys +from dataclasses import dataclass from pathlib import Path from threading import Lock # see https://stackoverflow.com/questions/59904631/python-class-constants-in-dataclasses -from typing import Any, Optional, Union +from typing import Any, Optional, Union, ClassVar, Protocol import parsl import psutil @@ -34,6 +35,53 @@ logger = logging.getLogger(__name__) # logging per module +PSIFLOW_INTERNAL = "psiflow_internal" + +EXECUTION_KWARGS = ( + "container_uri", + "container_engine", + "container_addopts", + "container_entrypoint", +) + + +@dataclass +class ContainerSpec: + uri: str + engine: str = "apptainer" + addopts: str = " --no-eval -e --no-mount home -W /tmp --writable-tmpfs" + entrypoint: str = "/opt/entry.sh" + + def __post_init__(self): + assert self.engine in ("apptainer", "singularity") + assert len(self.uri) > 0 + + def launch_command(self, gpu: bool = False) -> str: + # TODO: pretty sure some of this is overkill + pwd = Path.cwd().resolve() # access to data / internal dir + args = [self.engine, "exec", self.addopts, f"--bind {pwd}"] + if gpu: + if "rocm" in self.uri: + args.append("--rocm") + else: + args.append("--nv") + args += [self.uri, self.entrypoint] + return " ".join(args) + + @staticmethod + def from_kwargs(kwargs: dict) -> Optional[ContainerSpec]: + if "container_uri" not in kwargs: + return None + keys = ( + "container_uri", + "container_engine", + "container_addopts", + "container_entrypoint", + ) + args = [kwargs[key] for key in keys if key in kwargs] + return ContainerSpec(*args) # TODO: slightly hacky + + @typeguard.typechecked class ExecutionDefinition: def __init__( @@ -42,17 +90,21 @@ def __init__( gpu: bool, cores_per_worker: int, use_threadpool: bool, - worker_prepend: str, + container: Optional[ContainerSpec], max_workers: Optional[int] = None, + **kwargs, ) -> None: self.parsl_provider = parsl_provider self.gpu = gpu self.cores_per_worker = cores_per_worker self.use_threadpool = use_threadpool - self.worker_prepend = worker_prepend - self.name = self.__class__.__name__ + self.container = container self._max_workers = max_workers + @property + def name(self) -> str: + return self.__class__.__name__ + @property def cores_available(self): if type(self.parsl_provider) is LocalProvider: # noqa: F405 @@ -78,7 +130,7 @@ def max_runtime(self): walltime = 1e9 return walltime - def create_executor(self, path: Path, **kwargs) -> ParslExecutor: + def create_executor(self, path: Path) -> ParslExecutor: if self.use_threadpool: executor = ThreadPoolExecutor( max_threads=self.cores_per_worker, @@ -101,6 +153,13 @@ def create_executor(self, path: Path, **kwargs) -> ParslExecutor: else: worker_options.append("--idle-timeout={}".format(20)) + # only ModelEvaluation / ModelTraining / default_htex run in containers + if not isinstance(self, ReferenceEvaluation) and self.container: + prepend = self.container.launch_command() + worker_executable = f"{prepend} work_queue_worker" + else: + worker_executable = "work_queue_worker" + executor = MyWorkQueueExecutor( label=self.name, working_dir=str(path / self.name), @@ -111,7 +170,7 @@ def create_executor(self, path: Path, **kwargs) -> ParslExecutor: max_retries=1, coprocess=False, worker_options=" ".join(worker_options), - worker_executable="{} work_queue_worker".format(self.worker_prepend), + worker_executable=worker_executable, scaling_cores_per_worker=cores, ) return executor @@ -122,7 +181,7 @@ def from_config( gpu: bool = False, cores_per_worker: int = 1, use_threadpool: bool = False, - container: Optional[dict] = None, + container: Optional[ContainerSpec] = None, **kwargs, ): # search for any section in the config which defines the Parsl ExecutionProvider @@ -132,8 +191,7 @@ def from_config( provider_cls = SlurmProvider provider_kwargs = kwargs.pop("slurm") # do not allow empty dict provider_kwargs["init_blocks"] = 0 - if "exclusive" not in provider_kwargs: - provider_kwargs["exclusive"] = False + provider_kwargs.setdefault("exclusive", False) else: provider_cls = LocalProvider # noqa: F405 provider_kwargs = kwargs.pop("local", {}) @@ -145,10 +203,8 @@ def from_config( launcher = SimpleLauncher() if container is not None: + # TODO: why not exactly? assert not use_threadpool - worker_prepend = container_launch_command(gpu=gpu, **container) - else: - worker_prepend = "" # initialize provider parsl_provider = provider_cls( @@ -159,7 +215,7 @@ def from_config( parsl_provider=parsl_provider, gpu=gpu, use_threadpool=use_threadpool, - worker_prepend=worker_prepend, + container=container, cores_per_worker=cores_per_worker, **kwargs, ) @@ -229,7 +285,7 @@ def get_client_args( def wq_resources(self, nwalkers): if self.use_threadpool: - return None + return {} nclients = min(nwalkers, self.max_workers) resource_specification = {} resource_specification["cores"] = nclients * self.cores_per_worker @@ -314,73 +370,48 @@ def wq_resources(self): class ReferenceEvaluation(ExecutionDefinition): def __init__( self, - name: str, - launch_command: Optional[str] = None, + spec: ReferenceSpec, max_evaluation_time: Optional[float] = None, memory_limit: Optional[str] = None, **kwargs, ) -> None: + # TODO: how to know which code? super().__init__(**kwargs) - self.name = name # override name + self.spec = spec + self.max_evaluation_time = max_evaluation_time * 60 # seconds + if max_evaluation_time: + assert 0 < max_evaluation_time < self.max_runtime + self.memory_limit = memory_limit - if launch_command is None: - launch_command = self.default_launch_command - self.launch_command = launch_command + def command(self): + launch_command = self.spec.launch_command() + kwargs = {k: getattr(self, k) for k in self.spec.reference_args} + launch_command = launch_command.format(**kwargs) - if max_evaluation_time is None: - max_evaluation_time = self.max_runtime / 60 - assert max_evaluation_time * 60 <= self.max_runtime - self.max_evaluation_time = max_evaluation_time + if self.container is not None: + launch_command = f"{self.container.launch_command()} {launch_command}" - self.memory_limit = memory_limit - - @property - def default_launch_command(self): - if self.name.startswith("CP2K"): - ranks = self.cores_per_worker # use nprocs = ncores, nthreads = 1 - mpi_command = "mpirun -np {} ".format(ranks) - mpi_command += "-x OMP_NUM_THREADS=1 " # cp2k runs best with these settings - mpi_command += "--bind-to core --map-by core" # set explicitly - command = "cp2k.psmp -i cp2k.inp" - return " ".join([mpi_command, command]) - if self.name.startswith("GPAW"): - ranks = self.cores_per_worker # use nprocs = ncores, nthreads = 1 - mpi_command = "mpirun -np {} ".format(ranks) - mpi_command += "-x OMP_NUM_THREADS=1 " # cp2k runs best with these settings - mpi_command += "--bind-to core --map-by core" # set explicitly - script = "$(python -c 'import psiflow.reference.gpaw_; print(psiflow.reference.gpaw_.__file__)')" - return " ".join([mpi_command, "gpaw", "python", script]) - if self.name.startswith("ORCA"): - raise ValueError('provide path to ORCA executable via "launch_command"') + if (max_time := self.max_evaluation_time) is None: + # leave some slack for startup and cleanup + max_time = max(0.9 * self.max_runtime, self.max_runtime - 5) + launch_command = f"timeout -s 9 {max_time}s {launch_command}" - def command(self): - extra_commands = [] - - launch_command = self.launch_command - if self.name.startswith("CP2K"): - launch_command += " -i cp2k.inp" # add input file - elif self.name.startswith("GPAW"): - launch_command += " input.json" - if self.max_evaluation_time is not None: - max_time = 0.9 * (60 * self.max_evaluation_time) - launch_command = "timeout -s 9 {}s {}".format(max_time, launch_command) + commands = [] if self.memory_limit is not None: # based on https://stackoverflow.com/a/42865957/2002471 units = {"KB": 1, "MB": 2**10, "GB": 2**20, "TB": 2**30} - def parse_size(size): + def parse_size(size): # TODO: to utils? size = size.upper() if not re.match(r" ", size): size = re.sub(r"([KMGT]?B)", r" \1", size) number, unit = [string.strip() for string in size.split()] return int(float(number) * units[unit]) - actual = parse_size(self.memory_limit) - extra_commands.append("ulimit -v {}".format(actual)) + commands.append(f"ulimit -v {parse_size(self.memory_limit)}") - body = "; ".join(extra_commands + [launch_command, "exit 0"]) + "; " - command = " { " + body + " } " - return command + # exit code 0 so parsl always thinks bash app succeeded + return "\n".join([*commands, launch_command, "exit 0"]) def wq_resources(self): if self.use_threadpool: @@ -393,6 +424,72 @@ def wq_resources(self): resource_specification["running_time_min"] = self.max_evaluation_time return resource_specification + @property + def name(self) -> str: + return self.spec.name + + +class ReferenceSpec(Protocol): + name: ClassVar[str] + reference_args: ClassVar[tuple[str, ...]] + mpi_command: str + mpi_args: tuple[str, ...] + executable: str + + def launch_command(self) -> str: + raise NotImplementedError + + +@dataclass +class CP2KReferenceSpec(ReferenceSpec): + name = "CP2K" + reference_args = ("cores_per_worker",) + mpi_command: str = "mpirun -np {cores_per_worker}" + mpi_args: tuple[str, ...] = ( + "-ENV OMP_NUM_THREADS=1", + "--bind-to core", + "--map-by core", + ) + executable: str = "cp2k.psmp -i cp2k.inp" + + def launch_command(self): + # use nprocs = ncores, nthreads = 1 + return " ".join([self.mpi_command, *self.mpi_args, self.executable]) + + +@dataclass +class GPAWReferenceSpec(ReferenceSpec): + name = "GPAW" + reference_args = ("cores_per_worker",) + mpi_command: str = "mpirun -np {cores_per_worker}" + mpi_args: tuple[str, ...] = ( + "-x OMP_NUM_THREADS=1", + "--bind-to core", + "--map-by core", + ) + executable: str = "gpaw python script_gpaw.py input.json" + + def launch_command(self): + # use nprocs = ncores, nthreads = 1 + return " ".join([self.mpi_command, *self.mpi_args, self.executable]) + + +@dataclass +class ORCAReferenceSpec(ReferenceSpec): + name = "ORCA" + reference_args = () + mpi_command: str = "" + mpi_args: tuple[str, ...] = ( + "-x OMP_NUM_THREADS=1", + "--bind-to core", + "--map-by core", + ) + executable: str = "$(which orca) orca.inp" + + def launch_command(self): + mpi_str = " ".join(self.mpi_args) + return f'{self.executable} "{mpi_str}"' + @typeguard.typechecked class ExecutionContext: @@ -457,14 +554,10 @@ def from_config( default_threads: int = 4, htex_address: str = "127.0.0.1", zip_staging: Optional[bool] = None, - container_uri: Optional[str] = None, - container_engine: str = "apptainer", - container_addopts: str = " --no-eval -e --no-mount home -W /tmp --writable-tmpfs", - container_entrypoint: str = "/opt/entry.sh", - make_symlinks: bool = True, + make_symlinks: bool = False, **kwargs, ) -> ExecutionContext: - path = Path.cwd().resolve() / "psiflow_internal" + path = Path.cwd().resolve() / PSIFLOW_INTERNAL psiflow.resolve_and_check(path) if path.exists(): shutil.rmtree(path) @@ -474,31 +567,25 @@ def from_config( name="parsl", level=getattr(logging, parsl_log_level), ) - if container_uri is not None: - container = { - "uri": container_uri, - "engine": container_engine, - "addopts": container_addopts, - "entrypoint": container_entrypoint, - } - else: - container = None # create definitions + base_container = ContainerSpec.from_kwargs(kwargs) + kwargs.pop("container_uri", None) model_evaluation = ModelEvaluation.from_config( - container=container, + container=base_container, **kwargs.pop("ModelEvaluation", {}), ) model_training = ModelTraining.from_config( - container=container, - **kwargs.pop("ModelTraining", {'gpu': True}), # avoid triggering assertion + container=base_container, + **kwargs.pop("ModelTraining", {"gpu": True}), # avoid triggering assertion ) reference_evaluations = [] # reference evaluations might be class specific for key in list(kwargs.keys()): - if key[:4] in ["CP2K", "GPAW"]: + if key[:4] in REFERENCE_SPECS: # allow for e.g., CP2K_small config = kwargs.pop(key) reference_evaluation = ReferenceEvaluation.from_config( - name=key, + spec=init_spec(REFERENCE_SPECS[key[:4]], config), + container=ContainerSpec.from_kwargs(kwargs | config), **config, ) reference_evaluations.append(reference_evaluation) @@ -508,8 +595,8 @@ def from_config( executors = [d.create_executor(path=path) for d in definitions] # create default executors - if container is not None: - launcher = WrappedLauncher(prepend=container_launch_command(**container)) + if base_container is not None: + launcher = WrappedLauncher(prepend=base_container.launch_command()) else: launcher = SimpleLauncher() htex = HighThroughputExecutor( @@ -521,13 +608,12 @@ def from_config( cpu_affinity="none", provider=LocalProvider(launcher=launcher, init_blocks=0), # noqa: F405 ) - executors.append(htex) threadpool = ThreadPoolExecutor( label="default_threads", max_threads=default_threads, working_dir=str(path), ) - executors.append(threadpool) + executors.extend([htex, threadpool]) # remove additional kwargs # if zip_staging: @@ -614,33 +700,6 @@ def wait(cls): parsl.wait_for_current_tasks() -@typeguard.typechecked -def container_launch_command( - uri: str, - engine: str = "apptainer", - gpu: bool = False, - addopts: str = " --no-eval -e --no-mount home -W /tmp --writable-tmpfs", - entrypoint: str = "/opt/entry.sh", -) -> str: - assert engine in ["apptainer", "singularity"] - assert len(uri) > 0 - - launch_command = "" - launch_command += engine - launch_command += " exec " - launch_command += addopts - launch_command += " --bind {}".format( - Path.cwd().resolve() - ) # access to data / internal dir - if gpu: - if "rocm" in uri: - launch_command += " --rocm" - else: # default - launch_command += " --nv" - launch_command += " {} {} ".format(uri, entrypoint) - return launch_command - - class SlurmLauncher(Launcher): def __init__(self, debug: bool = True, overrides: str = ""): super().__init__(debug=debug) @@ -690,3 +749,16 @@ def _create_symlink(src: Path, dest: Path, is_dir: bool = False) -> None: else: dest.touch(exist_ok=True) src.symlink_to(dest, target_is_directory=is_dir) + + +REFERENCE_SPECS = { + "CP2K": CP2KReferenceSpec, + "GPAW": GPAWReferenceSpec, + "ORCA": ORCAReferenceSpec, +} + + +def init_spec(spec_cls: type(ReferenceSpec), kwargs: dict) -> ReferenceSpec: + keys = ("mpi_command", "mpi_args", "executable") + cls_kwargs = {k: kwargs[k] for k in keys if k in kwargs} + return spec_cls(**cls_kwargs) diff --git a/psiflow/functions.py b/psiflow/functions.py index 6b735e4e..e12bec91 100644 --- a/psiflow/functions.py +++ b/psiflow/functions.py @@ -6,15 +6,32 @@ from pathlib import Path from typing import ClassVar, Optional, Type, Union, get_type_hints +import ase import numpy as np import typeguard from ase import Atoms from ase.data import atomic_masses, chemical_symbols from ase.units import fs, kJ, mol, nm +from ase.stress import voigt_6_to_full_3x3_stress from psiflow.geometry import Geometry, NullState, create_outputs +def format_output( + geometry: Geometry, + energy: float, + forces: np.ndarray | None = None, + stress: np.ndarray | None = None, + **kwargs, +) -> dict: + """""" + forces = np.zeros(shape=(len(geometry), 3)) if forces is None else forces + stress = np.zeros(shape=(3, 3)) if stress is None else stress + if stress.size == 6: + stress = voigt_6_to_full_3x3_stress(stress) + return {"energy": energy, "forces": forces, "stress": stress} + + @typeguard.typechecked @dataclass class Function: @@ -39,9 +56,9 @@ def compute( if geometry == NullState: continue out = self(geometry) - value[i] = out['energy'] - grad_pos[i, :len(geometry)] = out['forces'] - grad_cell[i] = out['stress'] + value[i] = out["energy"] + grad_pos[i, : len(geometry)] = out["forces"] + grad_cell[i] = out["stress"] return {"energy": value, "forces": grad_pos, "stress": grad_cell} @@ -64,13 +81,11 @@ def __call__( delta = geometry.per_atom.positions - self.centers energy = self.force_constant * np.sum(delta**2) / 2 grad_pos = (-1.0) * self.force_constant * delta + grad_cell = None if geometry.periodic and self.volume > 0.0: delta = np.linalg.det(geometry.cell) - self.volume - _stress = self.force_constant * np.eye(3) * delta - else: - _stress = np.zeros((3, 3)) - grad_cell = _stress - return {"energy": energy, "forces": grad_pos, "stress": grad_cell} + grad_cell = self.force_constant * np.eye(3) * delta + return format_output(geometry, energy, grad_pos, grad_cell) @typeguard.typechecked @@ -94,9 +109,7 @@ def __call__( if key not in self.plumed_instances: from plumed import Plumed - tmp = tempfile.NamedTemporaryFile( - prefix="plumed_", mode="w+", delete=False - ) + tmp = tempfile.NamedTemporaryFile(prefix="plumed_", mode="w+", delete=False) # plumed creates a back up if this file would already exist os.remove(tmp.name) plumed_ = Plumed() @@ -136,11 +149,10 @@ def __call__( plumed_.cmd("prepareCalc") plumed_.cmd("performCalcNoUpdate") plumed_.cmd("getBias", energy) + stress = None if geometry.periodic: stress = virial / np.linalg.det(geometry.cell) - else: - stress = np.zeros((3, 3)) - return {"energy": float(energy.item()), "forces": forces, "stress": stress} + return format_output(geometry, float(energy.item()), forces, stress) @staticmethod def _geometry_to_key(geometry: Geometry) -> tuple: @@ -154,7 +166,7 @@ def __call__( self, geometry: Geometry, ) -> dict[str, float | np.ndarray]: - return {"energy": 0., "forces": np.zeros(shape=(len(geometry), 3)), "stress": np.zeros(shape=(3, 3))} + return format_output(geometry, 0) @typeguard.typechecked @@ -173,7 +185,7 @@ def __call__( energy = 0.5 * np.dot(delta, grad) if self.energy is not None: energy += self.energy - return {"energy": energy, "forces": (-1.0) * grad.reshape(-1, 3), "stress": np.zeros(shape=(3, 3))} + return format_output(geometry, energy, (-1.0) * grad.reshape(-1, 3)) @typeguard.typechecked @@ -228,7 +240,9 @@ def get_atomic_energy(self, geometry): try: total += counts[idx] * self.atomic_energies[symbol] except KeyError: - warnings.warn(f'(MACEFunction) No atomic energy entry for symbol "{symbol}". Are you sure?') + warnings.warn( + f'(MACEFunction) No atomic energy entry for symbol "{symbol}". Are you sure?' + ) return total def __call__( @@ -241,7 +255,11 @@ def __call__( # TODO: is this call necessary? # torch_tools.set_default_dtype(self.dtype) - energy, forces, stress = 0.0, np.zeros(shape=(len(geometry), 3)), np.zeros(shape=(3, 3)) + energy, forces, stress = ( + 0.0, + np.zeros(shape=(len(geometry), 3)), + np.zeros(shape=(3, 3)), + ) # compute offset if possible if self.atomic_energies: @@ -255,14 +273,49 @@ def __call__( pbc=geometry.periodic, ) config = data.config_from_atoms(atoms) - data = data.AtomicData.from_config(config, z_table=self.z_table, cutoff=self.r_max) + data = data.AtomicData.from_config( + config, z_table=self.z_table, cutoff=self.r_max + ) batch = Batch.from_data_list([data]).to(device=self.device) out = self.model(batch.to_dict(), compute_stress=cell is not None) energy += out["energy"].detach().cpu().item() forces += out["forces"].detach().cpu().numpy() if cell is not None: stress += out["stress"].detach().cpu().numpy().squeeze() - return {"energy": energy, "forces": forces, "stress": stress} + return format_output(geometry, energy, forces, stress) + + +@typeguard.typechecked +@dataclass +class DispersionFunction(EnergyFunction): + method: str + damping: str = "d3bj" + params_tweaks: Optional[dict[str, float]] = None + num_threads: int = 1 + + def __post_init__(self): + # OMP_NUM_THREADS for parallel evaluation does not work.. + # https://github.com/dftd3/simple-dftd3/issues/49 + os.environ["OMP_NUM_THREADS"] = str(self.num_threads * 10) + + from dftd3.ase import DFTD3 + + self.calc = DFTD3( + method=self.method, damping=self.damping, params_tweaks=self.params_tweaks + ) + + def __call__( + self, + geometry: Geometry, + ) -> dict[str, float | np.ndarray]: + atoms = ase.Atoms( # TODO: geometry.to_atoms? + numbers=geometry.per_atom.numbers, + positions=geometry.per_atom.positions, + cell=geometry.cell, + pbc=geometry.periodic, + ) + self.calc.calculate(atoms) + return format_output(geometry, **self.calc.results) def _apply( @@ -294,6 +347,7 @@ def function_from_json(path: Union[str, Path], **kwargs) -> Function: HarmonicFunction, MACEFunction, PlumedFunction, + DispersionFunction, None, ] with open(path, "r") as f: @@ -311,4 +365,3 @@ def function_from_json(path: Union[str, Path], **kwargs) -> Function: data[key] = value function = function_cls(**data) return function - diff --git a/psiflow/geometry.py b/psiflow/geometry.py index daa9241b..4e00d4cc 100644 --- a/psiflow/geometry.py +++ b/psiflow/geometry.py @@ -423,7 +423,7 @@ def from_atoms(cls, atoms: Atoms) -> Geometry: per_atom = np.recarray(len(atoms), dtype=per_atom_dtype) per_atom.numbers[:] = atoms.numbers.astype(np.uint8) per_atom.positions[:] = atoms.get_positions() - per_atom.forces[:] = atoms.arrays.get("forces", np.nan) + per_atom.forces[:] = atoms.arrays.get("forces", np.nan) # TODO: ASE stores forces in calc now if np.any(atoms.pbc): cell = np.array(atoms.cell) else: diff --git a/psiflow/hamiltonians.py b/psiflow/hamiltonians.py index 13b53603..df1e39d2 100644 --- a/psiflow/hamiltonians.py +++ b/psiflow/hamiltonians.py @@ -20,6 +20,7 @@ MACEFunction, PlumedFunction, ZeroFunction, + DispersionFunction, _apply, ) from psiflow.geometry import Geometry @@ -28,21 +29,20 @@ from psiflow.utils.io import dump_json -# TODO: executors not very consistent for some reason (default_threads <> default_htex) - @typeguard.typechecked @psiflow.serializable class Hamiltonian(Computable): + # TODO: app is actually an instance variable, but serialization complains.. outputs: ClassVar[tuple] = ("energy", "forces", "stress") batch_size = 1000 - app: ClassVar[Callable] # TODO: app is actually an instance variable, but serialization complains.. + app: ClassVar[Callable] function_name: ClassVar[str] def compute( self, arg: Union[Dataset, AppFuture[list], list, AppFuture, Geometry], *outputs: Optional[str], - batch_size: Optional[int] = -1, # if -1: take class default TODO: why not just None? + batch_size: Optional[int] = -1, # if -1: take class default TODO: why? ) -> Union[list[AppFuture], AppFuture]: if len(outputs) == 0: outputs = tuple(self.__class__.outputs) @@ -64,7 +64,7 @@ def __mul__(self, a: float) -> MixtureHamiltonian: def __add__(self, hamiltonian: Hamiltonian) -> Hamiltonian: if type(hamiltonian) is Zero: return self - mixture = MixtureHamiltonian([self], [1.]) + mixture = MixtureHamiltonian([self], [1.0]) return mixture.__add__(hamiltonian) def __sub__(self, hamiltonian: Hamiltonian) -> Hamiltonian: @@ -104,12 +104,12 @@ def __mul__(self, a: float) -> Hamiltonian: return Zero() def __add__(self, hamiltonian: Hamiltonian) -> Hamiltonian: - return hamiltonian # (Zero + Hamiltonian) is different from (Hamiltonian + Zero) + # (Zero + Hamiltonian) is different from (Hamiltonian + Zero) + return hamiltonian __rmul__ = __mul__ # handle float * Zero - @typeguard.typechecked @psiflow.serializable class MixtureHamiltonian(Hamiltonian): @@ -169,11 +169,11 @@ def __mul__(self, a: float) -> MixtureHamiltonian: def __len__(self) -> int: return len(self.coefficients) - def __add__(self, hamiltonian: Hamiltonian | MixtureHamiltonian) -> MixtureHamiltonian: + def __add__(self, hamiltonian: Hamiltonian) -> MixtureHamiltonian: if type(hamiltonian) is Zero: return self if type(hamiltonian) is not MixtureHamiltonian: - hamiltonian = 1.0 * hamiltonian # turn into mixture + hamiltonian = 1.0 * hamiltonian # turn into mixture coefficients = list(hamiltonian.coefficients) hamiltonians = list(hamiltonian.hamiltonians) @@ -208,9 +208,12 @@ def get_coefficient(self, hamiltonian: Hamiltonian) -> Optional[float]: return None return self.coefficients[idx] - def get_coefficients(self, mixture: MixtureHamiltonian) -> Optional[tuple[float, ...]]: + def get_coefficients( + self, mixture: MixtureHamiltonian + ) -> Optional[tuple[float, ...]]: assert type(mixture) is MixtureHamiltonian - for h in mixture.hamiltonians: # every mixture component must be a component of self + for h in mixture.hamiltonians: + # every mixture component must be a component of self if h not in self.hamiltonians: return None @@ -263,9 +266,9 @@ def parameters(self) -> dict: def __eq__(self, hamiltonian: Hamiltonian | EinsteinCrystal) -> bool: if ( - not isinstance(hamiltonian, EinsteinCrystal) or - not np.allclose(self.force_constant, hamiltonian.force_constant) or - self.reference_geometry != hamiltonian.reference_geometry + not isinstance(hamiltonian, EinsteinCrystal) + or not np.allclose(self.force_constant, hamiltonian.force_constant) + or self.reference_geometry != hamiltonian.reference_geometry ): return False return True @@ -310,8 +313,8 @@ def parameters(self) -> dict: def __eq__(self, other: Hamiltonian | PlumedHamiltonian) -> bool: if ( - not isinstance(other, PlumedHamiltonian) or - self.plumed_input != other.plumed_input + not isinstance(other, PlumedHamiltonian) + or self.plumed_input != other.plumed_input ): return False return True @@ -329,7 +332,8 @@ def __init__( reference_geometry: Union[Geometry, AppFuture[Geometry]], hessian: Union[np.ndarray, AppFuture[np.ndarray]], ): - self.reference_geometry = reference_geometry # TODO: why not copy_app_future(geometry) like others? + # TODO: why not copy_app_future(geometry) like others? + self.reference_geometry = reference_geometry self.hessian = hessian self._create_apps() @@ -367,12 +371,51 @@ def __eq__(self, hamiltonian: Hamiltonian | Harmonic) -> bool: ) else: equal = hamiltonian.hessian == self.hessian - if not equal: return False return True +@typeguard.typechecked +@psiflow.serializable +class D3Hamiltonian(Hamiltonian): + method: str + damping: str + function_name: ClassVar[str] = "DispersionFunction" + + def __init__(self, method: str, damping: str = "d3bj"): + self.method, self.damping = method, damping + self._create_apps() + + def _create_apps(self): + # TODO: does this make sense? GPU settings are useless for example + evaluation = psiflow.context().definitions["ModelEvaluation"] + apply_app = python_app(_apply, executors=["ModelEvaluation"]) + resources = evaluation.wq_resources(1) + resources.pop("gpus", None) + + # execution-side parameters of function are not included in self.parameters() + self.app = partial( + apply_app, + function_cls=DispersionFunction, + parsl_resource_specification=resources, + **self.parameters(), + num_threads=resources.get("cores", 1), # TODO: sloppy + ) + + def parameters(self) -> dict: + return {"method": self.method, "damping": self.damping} + + def __eq__(self, hamiltonian: Hamiltonian | D3Hamiltonian) -> bool: + if ( + not isinstance(hamiltonian, D3Hamiltonian) + or self.method != hamiltonian.method + or self.damping != hamiltonian.damping + ): + return False + return True + + @typeguard.typechecked @psiflow.serializable class MACEHamiltonian(Hamiltonian): @@ -433,7 +476,7 @@ def __eq__(self, hamiltonian: Hamiltonian | MACEHamiltonian) -> bool: return True # TODO: the methods below are outdated.. - + @classmethod def mace_mp0(cls, size: str = "small") -> MACEHamiltonian: urls = dict( diff --git a/psiflow/learning.py b/psiflow/learning.py index f1e72e37..3191a14c 100644 --- a/psiflow/learning.py +++ b/psiflow/learning.py @@ -15,7 +15,7 @@ from psiflow.hamiltonians import Hamiltonian, Zero from psiflow.metrics import Metrics from psiflow.models import Model -from psiflow.reference import Reference, evaluate +from psiflow.reference import Reference from psiflow.sampling import SimulationOutput, Walker, sample from psiflow.utils.apps import boolean_or, isnan, setup_logger, unpack_i @@ -66,7 +66,7 @@ def evaluate_outputs( metrics: Metrics, ) -> tuple[Union[int, AppFuture], Dataset, list[AppFuture]]: states = [o.get_state() for o in outputs] # take exit status into account - eval_ref = [evaluate(s, reference) for s in states] + eval_ref = [reference.evaluate(s) for s in states] eval_mod = Dataset(states).evaluate(hamiltonian) errors = [compute_error(s, eval_mod[i]) for i, s in enumerate(eval_ref)] processed_states = [] diff --git a/psiflow/models/_mace.py b/psiflow/models/_mace.py index d63bcde8..5ed67724 100644 --- a/psiflow/models/_mace.py +++ b/psiflow/models/_mace.py @@ -14,6 +14,7 @@ from psiflow.hamiltonians import MACEHamiltonian from psiflow.models.model import Model +# TODO: not used logger = logging.getLogger(__name__) # logging per module @@ -225,7 +226,8 @@ def _create_apps(self): # initialize apps app_initialize = bash_app(initialize, executors=[evaluation.name]) resources_init = evaluation.wq_resources(1) - if resources_init is not None: + # TODO: find a better way for model init + if not evaluation.use_threadpool: resources_init["running_time_min"] = 30 # at least 30 mins for init? app_train = bash_app(train, executors=[training.name]) resources_train = training.wq_resources() diff --git a/psiflow/reference/__init__.py b/psiflow/reference/__init__.py index 740172c5..621dd55f 100644 --- a/psiflow/reference/__init__.py +++ b/psiflow/reference/__init__.py @@ -1,4 +1,6 @@ -from ._cp2k import CP2K # noqa: F401 -from ._dftd3 import D3 # noqa: F401 +# module naming is chosen to avoid import conflicts +from .cp2k_ import CP2K # noqa: F401 from .gpaw_ import GPAW # noqa: F401 -from .reference import Reference, evaluate # noqa: F401 +from .orca_ import ORCA, create_input_template as create_orca_input # noqa: F401 +from .reference import Reference # noqa: F401 +from .dummy import ReferenceDummy # noqa: F401 diff --git a/psiflow/reference/_cp2k.py b/psiflow/reference/_cp2k.py deleted file mode 100644 index 7eba7354..00000000 --- a/psiflow/reference/_cp2k.py +++ /dev/null @@ -1,289 +0,0 @@ -from __future__ import annotations # necessary for type-guarding class methods - -import copy -import io -import logging -from functools import partial -from typing import Optional, Union - -import numpy as np -import typeguard -from ase.data import atomic_numbers -from ase.units import Bohr, Ha -from cp2k_input_tools.generator import CP2KInputGenerator -from cp2k_input_tools.parser import CP2KInputParserSimplified -from parsl.app.app import bash_app, python_app - -import psiflow -from psiflow.geometry import Geometry, NullState -from psiflow.reference.reference import Reference -from psiflow.utils import TMP_COMMAND, CD_COMMAND - -logger = logging.getLogger(__name__) # logging per module - - -@typeguard.typechecked -def check_input(cp2k_input_str: str): - pass - - -@typeguard.typechecked -def str_to_dict(cp2k_input_str: str) -> dict: - return CP2KInputParserSimplified( - repeated_section_unpack=True, - # multi_value_unpack=False, - # level_reduction_blacklist=['KIND'], - ).parse(io.StringIO(cp2k_input_str)) - - -@typeguard.typechecked -def dict_to_str(cp2k_input_dict: dict) -> str: - return "\n".join(list(CP2KInputGenerator().line_iter(cp2k_input_dict))) - - -@typeguard.typechecked -def insert_atoms_in_input(cp2k_input_dict: dict, geometry: Geometry): - from ase.data import chemical_symbols - - # get rid of topology if it's there - cp2k_input_dict["force_eval"]["subsys"].pop("topology", None) - - coord = [] - cell = {} - numbers = geometry.per_atom.numbers - positions = geometry.per_atom.positions - for i in range(len(geometry)): - coord.append("{} {} {} {}".format(chemical_symbols[numbers[i]], *positions[i])) - cp2k_input_dict["force_eval"]["subsys"]["coord"] = {"*": coord} - - assert geometry.periodic # CP2K needs cell info! - for i, vector in enumerate(["A", "B", "C"]): - cell[vector] = "{} {} {}".format(*geometry.cell[i]) - cp2k_input_dict["force_eval"]["subsys"]["cell"] = cell - - -@typeguard.typechecked -def set_global_section(cp2k_input_dict: dict, properties: tuple): - if "global" not in cp2k_input_dict: - cp2k_input_dict["global"] = {} - global_dict = cp2k_input_dict["global"] - - # override low/silent print levels - level = global_dict.pop("print_level", "MEDIUM") - if level in ["SILENT", "LOW"]: - global_dict["print_level"] = "MEDIUM" - - if properties == ("energy",): - global_dict["run_type"] = "ENERGY" - elif properties == ("energy", "forces"): - global_dict["run_type"] = "ENERGY_FORCE" - else: - raise ValueError("invalid properties: {}".format(properties)) - - if "preferred_diag_library" not in global_dict: - global_dict["preferred_diag_library"] = "SL" - if "fm" not in global_dict: - global_dict["fm"] = {"type_of_matrix_multiplication": "SCALAPACK"} - - -def parse_cp2k_output( - cp2k_output_str: str, properties: tuple, geometry: Geometry -) -> Geometry: - natoms = len(geometry) - all_lines = cp2k_output_str.split("\n") - - # read coordinates - lines = None - for i, line in enumerate(all_lines): - if line.strip().startswith("MODULE QUICKSTEP: ATOMIC COORDINATES IN ANGSTROM"): - skip = 3 - lines = all_lines[i + skip : i + skip + natoms] - if lines is None: - return NullState - assert len(lines) == natoms - positions = np.zeros((natoms, 3)) - for j, line in enumerate(lines): - try: - positions[j, :] = np.array([float(f) for f in line.split()[4:7]]) - except ValueError: # if positions exploded, CP2K puts *** instead of float - return NullState - assert np.allclose( - geometry.per_atom.positions, positions, atol=1e-2 - ) # accurate up to 0.01 A - - # try and read energy - energy = None - for line in all_lines: - if line.strip().startswith("ENERGY| Total FORCE_EVAL ( QS ) energy [a.u.]"): - energy = float(line.split()[-1]) * Ha - if energy is None: - return NullState - geometry.energy = energy - geometry.per_atom.forces[:] = np.nan - - # try and read forces if requested - if "forces" in properties: - lines = None - for i, line in enumerate(all_lines): - if line.strip().startswith("ATOMIC FORCES in [a.u.]"): - skip = 3 - lines = all_lines[i + skip : i + skip + natoms] - if lines is None: - return NullState - assert len(lines) == natoms - forces = np.zeros((natoms, 3)) - for j, line in enumerate(lines): - forces[j, :] = np.array([float(f) for f in line.split()[3:6]]) - forces *= Ha / Bohr - geometry.per_atom.forces[:] = forces - geometry.stress = None - return geometry - - -def _prepare_input( - geometry: Geometry, - cp2k_input_dict: dict = {}, - properties: tuple = (), - outputs: list = [], -): - from psiflow.reference._cp2k import ( - dict_to_str, - insert_atoms_in_input, - set_global_section, - ) - - set_global_section(cp2k_input_dict, properties) - insert_atoms_in_input(cp2k_input_dict, geometry) - if "forces" in properties: - cp2k_input_dict["force_eval"]["print"] = {"FORCES": {}} - cp2k_input_str = dict_to_str(cp2k_input_dict) - with open(outputs[0], "w") as f: - f.write(cp2k_input_str) - - -prepare_input = python_app(_prepare_input, executors=["default_threads"]) - - -# typeguarding for some reason incompatible with WQ -def cp2k_singlepoint_pre( - cp2k_command: str = "", - stdout: str = "", - stderr: str = "", - inputs: list = [], - parsl_resource_specification: Optional[dict] = None, -): - cp_command = f"cp {inputs[0].filepath} cp2k.inp" - command_list = [TMP_COMMAND, CD_COMMAND, cp_command, cp2k_command] - return " && ".join(command_list) - - -@typeguard.typechecked -def cp2k_singlepoint_post( - geometry: Geometry, - properties: tuple = (), - inputs: list = [], -) -> Geometry: - from psiflow.geometry import NullState, new_nullstate - from psiflow.reference._cp2k import parse_cp2k_output - - with open(inputs[0], "r") as f: - cp2k_output_str = f.read() - geometry = parse_cp2k_output(cp2k_output_str, properties, geometry) - if geometry != NullState: - geometry.stdout = inputs[0] - else: # a little hacky - geometry = new_nullstate() - geometry.stdout = inputs[0] - return geometry - - -@typeguard.typechecked -@psiflow.serializable -class CP2K(Reference): - outputs: list - executor: str - cp2k_input_str: str - cp2k_input_dict: dict - - def __init__( - self, - cp2k_input_str: str, - executor: str = "CP2K", - outputs: Union[tuple, list] = ("energy", "forces"), - ): - self.executor = executor - check_input(cp2k_input_str) - self.cp2k_input_str = cp2k_input_str - self.cp2k_input_dict = str_to_dict(cp2k_input_str) - self.outputs = list(outputs) - self._create_apps() - - def _create_apps(self): - definition = psiflow.context().definitions[self.executor] - cp2k_command = definition.command() - wq_resources = definition.wq_resources() - app_pre = bash_app(cp2k_singlepoint_pre, executors=[self.executor]) - app_post = python_app(cp2k_singlepoint_post, executors=["default_threads"]) - - # create wrapped pre app which first parses the input file and writes it to - # disk, then call the actual bash app with the input file as a DataFuture dependency - # This is necessary because for very large structures, the size of the cp2k input - # file is too long to pass as an argument in a command line - def wrapped_app_pre(geometry, stdout: str, stderr: str): - future = prepare_input( - geometry, - cp2k_input_dict=copy.deepcopy(self.cp2k_input_dict), - properties=tuple(self.outputs), - outputs=[psiflow.context().new_file("cp2k_", ".inp")], - ) - return app_pre( - cp2k_command=cp2k_command, - stdout=stdout, - stderr=stderr, - inputs=[future.outputs[0]], - parsl_resource_specification=wq_resources, - ) - - self.app_pre = wrapped_app_pre - self.app_post = partial( - app_post, - properties=tuple(self.outputs), - ) - - def get_single_atom_references(self, element): - number = atomic_numbers[element] - references = [] - for mult in range(1, 16): - if number % 2 == 0 and mult % 2 == 0: - continue # not 2N + 1 is never even - if mult - 1 > number: - continue # max S = 2 * (N * 1/2) + 1 - cp2k_input_dict = copy.deepcopy(self.cp2k_input_dict) - cp2k_input_dict["force_eval"]["dft"]["uks"] = "TRUE" - cp2k_input_dict["force_eval"]["dft"]["multiplicity"] = mult - cp2k_input_dict["force_eval"]["dft"]["charge"] = 0 - cp2k_input_dict["force_eval"]["dft"]["xc"].pop("vdw_potential", None) - if "scf" in cp2k_input_dict["force_eval"]["dft"]: - if "ot" in cp2k_input_dict["force_eval"]["dft"]["scf"]: - cp2k_input_dict["force_eval"]["dft"]["scf"]["ot"][ - "minimizer" - ] = "CG" - else: - cp2k_input_dict["force_eval"]["dft"]["scf"]["ot"] = { - "minimizer": "CG" - } - else: - cp2k_input_dict["force_eval"]["dft"]["scf"] = { - "ot": {"minimizer": "CG"} - } - # necessary for oxygen calculation, at least in 2024.1 - key = "ignore_convergence_failure" - cp2k_input_dict["force_eval"]["dft"]["scf"][key] = "TRUE" - - reference = CP2K( - dict_to_str(cp2k_input_dict), - outputs=self.outputs, - executor=self.executor, - ) - references.append((mult, reference)) - return references diff --git a/psiflow/reference/_dftd3.py b/psiflow/reference/_dftd3.py deleted file mode 100644 index ecdd6b71..00000000 --- a/psiflow/reference/_dftd3.py +++ /dev/null @@ -1,128 +0,0 @@ -import json -from functools import partial - -import numpy as np -import typeguard -from parsl.app.app import bash_app, python_app -from parsl.dataflow.futures import AppFuture - -import psiflow -from psiflow.geometry import Geometry -from psiflow.reference.reference import Reference -from psiflow.utils.apps import copy_app_future -from psiflow.utils import TMP_COMMAND, CD_COMMAND - - -@typeguard.typechecked -def input_string(geometry: Geometry, parameters: dict, properties: tuple) -> str: - geometry_str = geometry.to_string() - data = { - "geometry": geometry_str, - "parameters": parameters, - "properties": properties, - } - return json.dumps(data) - - -def d3_singlepoint_pre( - geometry: Geometry, - parameters: dict, - properties: tuple, - d3_command: str, - stdout: str = "", - stderr: str = "", -) -> str: - from psiflow.reference._dftd3 import input_string - input_str = input_string(geometry, parameters, properties) - command_list = [ - TMP_COMMAND, - CD_COMMAND, - f"echo '{input_str}' > input.json", - f"python -u {d3_command}", - ] - return "\n".join(command_list) - - -@typeguard.typechecked -def d3_singlepoint_post( - geometry: Geometry, - inputs: list = [], -) -> Geometry: - from psiflow.geometry import new_nullstate - - with open(inputs[0], "r") as f: - lines = f.read().split("\n") - - geometry = new_nullstate() - for i, line in enumerate(lines): - if "CALCULATION SUCCESSFUL" in line: - natoms = int(lines[i + 1]) - geometry_str = "\n".join(lines[i + 1 : i + 3 + natoms]) - geometry = Geometry.from_string(geometry_str) - assert geometry.energy is not None - geometry.stdout = inputs[0] - return geometry - - -@typeguard.typechecked -@psiflow.serializable -class D3(Reference): - outputs: list # json does deserialize(serialize(tuple)) = list - executor: str - parameters: dict - - def __init__( - self, - **parameters, - ): - self.parameters = parameters - self.outputs = ["energy", "forces"] - self.executor = "default_htex" - self._create_apps() - - def _create_apps(self): - path = "psiflow.reference._dftd3" - d3_command = "$(python -c 'import {}; print({}.__file__)')".format(path, path) - app_pre = bash_app(d3_singlepoint_pre, executors=["default_htex"]) - app_post = python_app(d3_singlepoint_post, executors=["default_threads"]) - self.app_pre = partial( - app_pre, - parameters=self.parameters, - properties=tuple(self.outputs), - d3_command=d3_command, - ) - self.app_post = app_post - - def compute_atomic_energy(self, element, box_size=None) -> AppFuture: - return copy_app_future(0.0) # GPAW computes formation energy by default - - -if __name__ == "__main__": - from ase import Atoms - from dftd3.ase import DFTD3 - - with open("input.json", "r") as f: - input_dict = json.loads(f.read()) - - geometry = Geometry.from_string(input_dict["geometry"]) - parameters = input_dict["parameters"] - properties = input_dict["properties"] - - atoms = Atoms( - numbers=np.copy(geometry.per_atom.numbers), - positions=np.copy(geometry.per_atom.positions), - cell=np.copy(geometry.cell), - pbc=geometry.periodic, - ) - - calculator = DFTD3(**parameters) - atoms.calc = calculator - - if "forces" in properties: - geometry.per_atom.forces[:] = atoms.get_forces() - if "energy" in properties: - geometry.energy = atoms.get_potential_energy() - - output_str = geometry.to_string() - print("CALCULATION SUCCESSFUL") - print(output_str) diff --git a/psiflow/reference/_gpaw.py b/psiflow/reference/_gpaw.py new file mode 100644 index 00000000..e5679cf5 --- /dev/null +++ b/psiflow/reference/_gpaw.py @@ -0,0 +1,76 @@ +import json +import io + +import numpy as np +import ase.io +from ase import Atoms +from ase.calculators.mixing import SumCalculator +from ase.parallel import world + +FILEPATH = __file__ +DEFAULTS = dict(d3={}, h=0.2, minimal_box_border=2, minimal_box_multiple=4) +STDOUT_KEY = "CALCULATION SUCCESSFUL" + + +def minimal_box( + atoms: Atoms, + h: float, + border: float, + multiple: int, +) -> None: + # inspired by gpaw.cluster.Cluster + if len(atoms) == 0: + return None + min_bounds, max_bounds = np.array( + [np.minimum.reduce(atoms.positions), np.maximum.reduce(atoms.positions)] + ) + if isinstance(border, list): + b = np.array(border) + else: + b = np.array([border, border, border]) + if not hasattr(h, "__len__"): + h = np.array([h, h, h]) + min_bounds -= b + max_bounds += b - min_bounds + grid_points = np.ceil(max_bounds / h / multiple) * multiple + length_diff = grid_points * h - max_bounds + max_bounds += length_diff + min_bounds -= length_diff / 2 + shift = tuple(-1.0 * min_bounds) + atoms.translate(shift) + atoms.set_cell(tuple(max_bounds)) + + +if __name__ == "__main__": + from dftd3.ase import DFTD3 + from gpaw import GPAW as GPAWCalculator + + with open("input.json", "r") as f: + input_dict = json.loads(f.read()) + + atoms_str = io.StringIO(input_dict["geometry"]) + atoms = ase.io.read(atoms_str, format="extxyz") + + gpaw_parameters = input_dict["gpaw_parameters"] + if not all(atoms.pbc): + minimal_box( + atoms, + gpaw_parameters.get("h", 0.2), + gpaw_parameters.pop("minimal_box_border", 2), # if present, remove + gpaw_parameters.pop("minimal_box_multiple", 4), + ) + + d3 = gpaw_parameters.pop("d3", {}) + calculator = GPAWCalculator(**gpaw_parameters) + if len(d3) > 0: + calculator = SumCalculator([calculator, DFTD3(**d3)]) + + atoms.calc = calculator + if "forces" in input_dict["properties"]: + atoms.get_forces() + atoms.get_potential_energy() + + if world.rank == 0: + print(STDOUT_KEY) + ase.io.write("-", atoms, format="extxyz") + print(STDOUT_KEY) diff --git a/psiflow/reference/cp2k_.py b/psiflow/reference/cp2k_.py new file mode 100644 index 00000000..cec01f0e --- /dev/null +++ b/psiflow/reference/cp2k_.py @@ -0,0 +1,183 @@ +from __future__ import annotations # necessary for type-guarding class methods + +import copy +import io +import warnings +from typing import Optional, Union + +import numpy as np +from ase.data import chemical_symbols +from ase.units import Bohr, Ha +from cp2k_input_tools.generator import CP2KInputGenerator +from cp2k_input_tools.parser import CP2KInputParserSimplified +from parsl import File +from parsl.dataflow.futures import AppFuture + +import psiflow +from psiflow.geometry import Geometry +from psiflow.reference.reference import Reference, Status, get_spin_multiplicities +from psiflow.utils.parse import find_line, lines_to_array +from psiflow.utils import TMP_COMMAND, CD_COMMAND + + +# costly to initialise +input_parser = CP2KInputParserSimplified( + repeated_section_unpack=True, + # multi_value_unpack=False, + # level_reduction_blacklist=['KIND'], +) +input_generator = CP2KInputGenerator() + + +def str_to_dict(input_str: str) -> dict: + return input_parser.parse(io.StringIO(input_str)) + + +def dict_to_str(input_dict: dict) -> str: + return "\n".join(list(input_generator.line_iter(input_dict))) + + +def modify_input(input_dict: dict, properties: tuple) -> None: + global_dict = input_dict.setdefault("global", {}) + + # override low/silent print levels + if global_dict.get("print_level") in ["SILENT", "LOW"]: + global_dict["print_level"] = "MEDIUM" + + if "forces" in properties: + # output forces + global_dict["run_type"] = "ENERGY_FORCE" + input_dict["force_eval"]["print"] = {"FORCES": {}} + elif "energy" in properties: + global_dict["run_type"] = "ENERGY" + else: + raise ValueError("invalid properties: {}".format(properties)) + + if "preferred_diag_library" not in global_dict: + global_dict["preferred_diag_library"] = "SL" + if "fm" not in global_dict: + global_dict["fm"] = {"type_of_matrix_multiplication": "SCALAPACK"} + + +def parse_output(output_str: str, properties: tuple) -> dict[str, float | np.ndarray]: + lines = output_str.split("\n") + data = {} + + # output status + idx = find_line(lines, "CP2K", reverse=True, max_lines=250) + data["status"] = status = Status.SUCCESS if idx is not None else Status.FAILED + if status == Status.SUCCESS: + # total runtime + data["runtime"] = float(lines[idx].split()[-1]) + + # find number of atoms + idx = find_line(lines, "TOTAL NUMBERS AND MAXIMUM NUMBERS") + idx = find_line(lines, "- Atoms:", idx) + natoms = data["natoms"] = int(lines[idx].split()[-1]) + + # read coordinates + key = "MODULE QUICKSTEP: ATOMIC COORDINATES IN ANGSTROM" + idx = find_line(lines, key, idx) + 3 + data["positions"] = lines_to_array(lines[idx : idx + natoms], 4, 7) + + # read energy + key = "ENERGY| Total FORCE_EVAL ( QS ) energy [a.u.]" + idx = find_line(lines, key, idx) + data["energy"] = float(lines[idx].split()[-1]) * Ha + + if "forces" not in properties: + return data + + # read forces + key = "ATOMIC FORCES in [a.u.]" + idx = find_line(lines, key, idx) + 3 + forces = lines_to_array(lines[idx : idx + natoms], 3) + + return data | {"forces": forces * Ha / Bohr} + + +@psiflow.serializable +class CP2K(Reference): + _execute_label = "cp2k_singlepoint" + input_dict: dict + + def __init__( + self, + input_str: str, + executor: str = "CP2K", + outputs: Union[tuple, list] = ("energy", "forces"), + ): + self.executor = executor + self.outputs = tuple(outputs) + self.input_dict = str_to_dict(input_str) + modify_input(self.input_dict, outputs) + self._create_apps() + + def compute_atomic_energy(self, element, box_size=None) -> AppFuture[float]: + assert box_size, "CP2K expects a periodic box." + return super().compute_atomic_energy(element, box_size) + + def get_single_atom_references(self, element: str) -> dict[int, Reference]: + input_dict = copy.deepcopy(self.input_dict) + input_section = input_dict["force_eval"]["dft"] + input_section |= {"uks": "TRUE", "charge": 0, "multiplicity": "{mult}"} + input_section["xc"].pop("vdw_potential", None) # no dispersion + if "scf" in input_section: + if "ot" in input_section["scf"]: + input_section["scf"]["ot"]["minimizer"] = "CG" + else: + input_section["scf"]["ot"] = {"minimizer": "CG"} + else: + input_section["scf"] = {"ot": {"minimizer": "CG"}} + # necessary for oxygen calculation, at least in 2024.1 + input_section["scf"]["ignore_convergence_failure"] = "TRUE" + input_str = dict_to_str(input_dict) + + references = {} + for mult in get_spin_multiplicities(element): + references[mult] = CP2K( + input_str.format(mult=mult), + outputs=self.outputs, + executor=self.executor, + ) + return references + + def get_shell_command(self, inputs: list[File]) -> str: + command_list = [ + TMP_COMMAND, + CD_COMMAND, + f"cp {inputs[0].filepath} cp2k.inp", + self.execute_command, + ] + return "\n".join(command_list) + + def parse_output(self, stdout: str) -> dict: + return parse_output(stdout, self.outputs) + + def create_input(self, geom: Geometry) -> tuple[bool, Optional[File]]: + if not geom.periodic: + msg = "CP2K expects periodic boundary conditions, skipping geometry" + warnings.warn(msg) + return False, None + + section = self.input_dict["force_eval"]["subsys"] + section_copy = copy.deepcopy(section) + section.pop("topology", None) # remove topology section + + # insert geometry and cell + symbols = np.array(chemical_symbols)[geom.per_atom.numbers] + coord = [ + f"{s:5} {p[0]:<15.8f} {p[1]:<15.8f} {p[2]:<15.8f}" + for s, p in zip(symbols, geom.per_atom.positions) + ] + section["coord"] = {"*": coord} + cell = {} + for i, vector in enumerate(["A", "B", "C"]): + cell[vector] = "{} {} {}".format(*geom.cell[i]) + section["cell"] = cell + + with open(file := psiflow.context().new_file("cp2k_", ".inp"), "w") as f: + f.write(dict_to_str(self.input_dict)) + + self.input_dict["force_eval"]["subsys"] = section_copy # revert changes + return True, File(file) diff --git a/psiflow/reference/dummy.py b/psiflow/reference/dummy.py new file mode 100644 index 00000000..d175c9d3 --- /dev/null +++ b/psiflow/reference/dummy.py @@ -0,0 +1,56 @@ +from __future__ import annotations # necessary for type-guarding class methods + +from typing import Optional, Union +from functools import partial + +import numpy as np +from parsl import File, bash_app, python_app + +import psiflow +from psiflow.geometry import Geometry +from psiflow.reference.reference import Reference, Status +from psiflow.reference.reference import _execute, _process_output + + +@psiflow.serializable +class ReferenceDummy(Reference): + _execute_label = "dummy_singlepoint" + + def __init__(self, outputs: Union[tuple, list] = ("energy", "forces")): + self.outputs = outputs + self._create_apps() + + def _create_apps(self): + # psiflow.context().definitions does not contain "default_htex" + self.execute_command = "" + self.app_pre = self.create_input + self.app_execute = partial( + bash_app(_execute, executors=["default_htex"]), + reference=self, + parsl_resource_specification={}, + label=self._execute_label, + ) + self.app_post = partial( + python_app(_process_output, executors=["default_threads"]), + reference=self, + ) + + def get_shell_command(self, inputs: list[File]) -> str: + return f"cat {inputs[0].filepath}" + + def parse_output(self, stdout: str) -> dict: + geom = Geometry.from_string(stdout) + data = { + "status": Status.SUCCESS, + "positions": geom.per_atom.positions, + "natoms": len(geom), + "energy": np.random.uniform(), + } + if "forces" in self.outputs: + data["forces"] = np.random.uniform(size=(len(geom), 3)) + return data + + def create_input(self, geom: Geometry) -> tuple[bool, File]: + with open(file := psiflow.context().new_file("dummy_", ".inp"), "w") as f: + f.write(geom.to_string()) + return True, File(file) diff --git a/psiflow/reference/gpaw_.py b/psiflow/reference/gpaw_.py index c74b595c..41e4c58b 100644 --- a/psiflow/reference/gpaw_.py +++ b/psiflow/reference/gpaw_.py @@ -1,175 +1,82 @@ +from __future__ import annotations # necessary for type-guarding class methods + import json -from functools import partial -from typing import Union +from pathlib import Path +from typing import Optional, Union -import numpy as np -import typeguard -from parsl.app.app import bash_app, python_app +from parsl import File from parsl.dataflow.futures import AppFuture import psiflow -from psiflow.geometry import Geometry, new_nullstate -from psiflow.reference.reference import Reference -from psiflow.utils.apps import copy_app_future +from psiflow.geometry import Geometry +from psiflow.reference.reference import Reference, Status from psiflow.utils import TMP_COMMAND, CD_COMMAND +from psiflow.utils.apps import copy_app_future +from psiflow.utils.parse import find_line +from psiflow.reference._gpaw import FILEPATH, DEFAULTS, STDOUT_KEY -@typeguard.typechecked -def input_string(geometry: Geometry, gpaw_parameters: dict, properties: tuple) -> str: - geometry_str = geometry.to_string() +def parse_output(stdout: str, properties: tuple[str, ...]) -> dict: + lines = stdout.split("\n") + idx_start = find_line(lines, STDOUT_KEY) + 1 + idx_stop = find_line(lines, STDOUT_KEY, idx_start) + txt = "\n".join(lines[idx_start:idx_stop]) + geom = Geometry.from_string(txt) + idx = find_line(lines, "Total:", reverse=True) data = { - "geometry": geometry_str, - "gpaw_parameters": gpaw_parameters, - "properties": properties, + "status": Status.SUCCESS, + "runtime": lines[idx].split()[-2], + "positions": geom.per_atom.positions, + "natoms": len(geom), + "energy": geom.energy, } - return json.dumps(data) - - -def gpaw_singlepoint_pre( - geometry: Geometry, - gpaw_parameters: dict, - properties: tuple, - gpaw_command: str, - parsl_resource_specification: dict = {}, - stdout: str = "", - stderr: str = "", -) -> str: - from psiflow.reference.gpaw_ import input_string - input_str = input_string(geometry, gpaw_parameters, properties) - write_command = f"echo '{input_str}' > input.json" - command_list = [ - TMP_COMMAND, - CD_COMMAND, - write_command, - gpaw_command, - ] - return "\n".join(command_list) - - -@typeguard.typechecked -def gpaw_singlepoint_post( - geometry: Geometry, - inputs: list = [], -) -> Geometry: - with open(inputs[0], "r") as f: - lines = f.read().split("\n") - - geometry = new_nullstate() # GPAW parsing doesn't require initial geometry - for i, line in enumerate(lines): - if "CALCULATION SUCCESSFUL" in line: - natoms = int(lines[i + 1]) - geometry_str = "\n".join(lines[i + 1 : i + 3 + natoms]) - geometry = Geometry.from_string(geometry_str) - assert geometry.energy is not None - geometry.stdout = inputs[0] - return geometry + if "forces" in properties: + data["forces"] = geom.per_atom.forces + return data -@typeguard.typechecked @psiflow.serializable class GPAW(Reference): - outputs: list # json does deserialize(serialize(tuple)) = list - executor: str + _execute_label = "gpaw_singlepoint" parameters: dict + script: str def __init__( self, + parameters: dict, + script: str | Path = FILEPATH, outputs: Union[tuple, list] = ("energy", "forces"), executor: str = "GPAW", - **parameters, ): - self.outputs = list(outputs) + self.outputs = tuple(outputs) self.parameters = parameters + assert (script := Path(script)).is_file() + self.script = str(script.resolve()) # absolute path self.executor = executor self._create_apps() - def _create_apps(self): - definition = psiflow.context().definitions[self.executor] - gpaw_command = definition.command() - wq_resources = definition.wq_resources() - app_pre = bash_app(gpaw_singlepoint_pre, executors=[self.executor]) - app_post = python_app(gpaw_singlepoint_post, executors=["default_threads"]) - self.app_pre = partial( - app_pre, - gpaw_parameters=self.parameters, - properties=tuple(self.outputs), - gpaw_command=gpaw_command, - parsl_resource_specification=wq_resources, - ) - self.app_post = app_post - def compute_atomic_energy(self, element, box_size=None) -> AppFuture: return copy_app_future(0.0) # GPAW computes formation energy by default - -if __name__ == "__main__": - from ase import Atoms - from ase.calculators.mixing import SumCalculator - from ase.parallel import world - from dftd3.ase import DFTD3 - from gpaw import GPAW as GPAWCalculator - - def minimal_box( - atoms: Atoms, - border: float = 0.0, - h: float = 0.2, - multiple: int = 4, - ) -> None: - # inspired by gpaw.cluster.Cluster - if len(atoms) == 0: - return None - min_bounds, max_bounds = np.array( - [np.minimum.reduce(atoms.positions), np.maximum.reduce(atoms.positions)] - ) - if isinstance(border, list): - b = np.array(border) - else: - b = np.array([border, border, border]) - if not hasattr(h, "__len__"): - h = np.array([h, h, h]) - min_bounds -= b - max_bounds += b - min_bounds - grid_points = np.ceil(max_bounds / h / multiple) * multiple - length_diff = grid_points * h - max_bounds - max_bounds += length_diff - min_bounds -= length_diff / 2 - shift = tuple(-1.0 * min_bounds) - atoms.translate(shift) - atoms.set_cell(tuple(max_bounds)) - - with open("input.json", "r") as f: - input_dict = json.loads(f.read()) - - geometry = Geometry.from_string(input_dict["geometry"]) - gpaw_parameters = input_dict["gpaw_parameters"] - properties = input_dict["properties"] - d3 = gpaw_parameters.pop("d3", {}) - - atoms = Atoms( - numbers=np.copy(geometry.per_atom.numbers), - positions=np.copy(geometry.per_atom.positions), - cell=np.copy(geometry.cell), - pbc=geometry.periodic, - ) - if not geometry.periodic: - minimal_box( - atoms, - gpaw_parameters.get("h", 0.2), - gpaw_parameters.pop("minimal_box_border", 2), # if present, remove - gpaw_parameters.pop("minimal_box_multiple", 4), - ) - - calculator = GPAWCalculator(**gpaw_parameters) - if len(d3) > 0: - calculator = SumCalculator([calculator, DFTD3(**d3)]) - atoms.calc = calculator - - if "forces" in properties: - geometry.per_atom.forces[:] = atoms.get_forces() - if "energy" in properties: - geometry.energy = atoms.get_potential_energy() - - output_str = geometry.to_string() - if world.rank == 0: - print("CALCULATION SUCCESSFUL") - print(output_str) + def get_shell_command(self, inputs: list[File]) -> str: + command_list = [ + TMP_COMMAND, + CD_COMMAND, + f"cp {inputs[0].filepath} input.json", + f"cp {self.script} script_gpaw.py", + self.execute_command, + ] + return "\n".join(command_list) + + def parse_output(self, stdout: str) -> dict: + return parse_output(stdout, self.outputs) + + def create_input(self, geom: Geometry) -> tuple[bool, File]: + data = { + "geometry": geom.to_string(), + "gpaw_parameters": self.parameters, + "properties": self.outputs, + } + with open(file := psiflow.context().new_file("gpaw_", ".json"), "w") as f: + f.write(json.dumps(data)) + return True, File(file) diff --git a/psiflow/reference/orca.py b/psiflow/reference/orca.py deleted file mode 100644 index e69de29b..00000000 diff --git a/psiflow/reference/orca_.py b/psiflow/reference/orca_.py new file mode 100644 index 00000000..71f4338f --- /dev/null +++ b/psiflow/reference/orca_.py @@ -0,0 +1,189 @@ +from __future__ import annotations # necessary for type-guarding class methods + +import warnings +import re +from functools import partial +from typing import Optional, Union + +import ase.symbols +import numpy as np +from ase.units import Bohr, Ha +from parsl import File +from parsl.dataflow.futures import AppFuture + +import psiflow +from psiflow.geometry import Geometry +from psiflow.reference.reference import Reference, Status, get_spin_multiplicities +from psiflow.utils import TMP_COMMAND, CD_COMMAND +from psiflow.utils.parse import find_line, lines_to_array, string_to_timedelta + + +KEY_GHOST = "ghost" +KEY_DUMMY = 0 +SYMBOLS = np.array(ase.symbols.chemical_symbols) +SYMBOLS[KEY_DUMMY] = "DA" +DEFAULT_KWARGS = dict(charge=0, multiplicity=1) + + +def format_block(key: str, keywords: list[str]) -> str: + """""" + return "\n".join([f"%{key}", *[f"\t{s}" for s in keywords], "end"]) + + +def create_input_template( + task: str = "EnGrad", + method: str = "HF", + basis: str = "def2-SVP", + comment: str = "", + keywords: list[str] = (), + blocks: dict[str, list[str]] = None, +) -> str: + """""" + blocks = blocks or {} + lines = [ + f"# {comment}", + f'! {task} {method} {basis} {" ".join(keywords)}', + *[format_block(k, v) for k, v in blocks.items()], + ] + return "\n".join(lines) + + +def format_coord(geom: Geometry) -> str: + """""" + # TODO: ghost atoms? + symbols = SYMBOLS[geom.per_atom.numbers] + # if KEY_GHOST in atoms.arrays: + # for idx in np.flatnonzero(atoms.arrays[KEY_GHOST]): + # symbols[idx] = f"{symbols[idx]}:" + data = [ + f"{s:5} {p[0]:<15.8f} {p[1]:<15.8f} {p[2]:<15.8f}" + for s, p in zip(symbols, geom.per_atom.positions) + ] + return "\n".join(data) + + +def check_input(input_template: str, properties: tuple[str, ...]) -> str: + """""" + research = partial(re.search, string=input_template, flags=re.IGNORECASE) + + compute_forces = bool(research("Engrad") or research("Numgrad")) + if compute_forces and "forces" not in properties: + msg = "ORCA input asks for gradients, but 'forces' not in outputs." + warnings.warn(msg) + elif not compute_forces and "forces" in properties: + msg = "ORCA input does not ask for gradients, but 'forces' in outputs." + warnings.warn(msg) + + # add some placeholder blocks + lines = [input_template] + if not research("%maxcore"): + lines.append("%maxcore {memory}") + if not research("%pal"): + lines.append(format_block("pal", ["nprocs {cores}"])) + if not research("\*xyz"): + lines.append("*xyz {charge} {multiplicity}\n{coord}\n*") + + return "\n".join(lines) + + +def parse_output(stdout: str, properties: tuple[str, ...]) -> dict: + lines = stdout.split("\n") + data = {} + + # output status + line = "****ORCA TERMINATED NORMALLY****" + idx = find_line(lines, line, reverse=True, max_lines=5) + data["status"] = status = Status.SUCCESS if idx is not None else Status.FAILED + if status == Status.SUCCESS: + # total runtime + idx = find_line(lines, "TOTAL RUN TIME", reverse=True, max_lines=5) + data["runtime"] = string_to_timedelta(lines[idx][16:]) + + # read coordinates + idx_start = idx = find_line(lines, "CARTESIAN COORDINATES (ANGSTROEM)") + 2 + idx_stop = idx = find_line(lines, "---", idx) - 1 + positions = lines_to_array(lines[idx_start:idx_stop], start=1, stop=4) + data["positions"], data["natoms"] = positions, positions.shape[0] + + # read energy + # TODO: not exactly equal to log file - different conversion factor? + idx = find_line(lines, "FINAL SINGLE POINT ENERGY", idx) + data["energy"] = float(lines[idx].split()[-1]) * Ha + + if "forces" not in properties: + return data + + # read forces + idx_start = idx = find_line(lines, "CARTESIAN GRADIENT", idx) + 3 + idx_stop = find_line(lines, "Difference", idx) - 1 + forces = lines_to_array(lines[idx_start:idx_stop], start=3, stop=6) + data["forces"] = forces * Ha / Bohr + + return data + + +@psiflow.serializable +class ORCA(Reference): + _execute_label = "orca_singlepoint" + input_template: str + input_kwargs: dict + + def __init__( + self, + input_template: str, + executor: str = "ORCA", + outputs: Union[tuple, list] = ("energy", "forces"), + ): + self.executor = executor + self.input_template = check_input(input_template, outputs) + self.input_kwargs = DEFAULT_KWARGS.copy() # TODO: user control? + self.outputs = tuple(outputs) + self._create_apps() + + def _create_apps(self): + super()._create_apps() + definition = psiflow.context().definitions[self.executor] + wq_resources = definition.wq_resources() + cores, memory = wq_resources["cores"], wq_resources["memory"] + self.input_kwargs |= {"cores": cores, "memory": memory // cores} # in MB + + def compute_atomic_energy(self, element, box_size=None) -> AppFuture[float]: + assert box_size is None, "ORCA does not do PBC" + return super().compute_atomic_energy(element) + + def get_single_atom_references(self, element: str) -> dict[int, Reference]: + # TODO: this is not properly tested - special options? + # ORCA automatically switches to UHF/UKS + references = {} + for mult in get_spin_multiplicities(element): + ref = ORCA( + self.input_template, outputs=self.outputs, executor=self.executor + ) + ref.input_kwargs["multiplicity"] = mult + references[mult] = ref + return references + + def get_shell_command(self, inputs: list[File]) -> str: + command_list = [ + TMP_COMMAND, + CD_COMMAND, + f"cp {inputs[0].filepath} orca.inp", + self.execute_command, + ] + return "\n".join(command_list) + + def parse_output(self, stdout: str) -> dict: + return parse_output(stdout, self.outputs) + + def create_input(self, geom: Geometry) -> tuple[bool, Optional[File]]: + if geom.periodic: + msg = "ORCA does not support periodic boundary conditions" + warnings.warn(msg) + return False, None + + input_str = self.input_template.format( + coord=format_coord(geom), **self.input_kwargs + ) + with open(file := psiflow.context().new_file("orca_", ".inp"), "w") as f: + f.write(input_str) + return True, File(file) diff --git a/psiflow/reference/reference.py b/psiflow/reference/reference.py index 8c9d83c3..b6ed9321 100644 --- a/psiflow/reference/reference.py +++ b/psiflow/reference/reference.py @@ -1,156 +1,236 @@ from __future__ import annotations # necessary for type-guarding class methods -import logging -from typing import ClassVar, Optional, Union +import warnings +from typing import ClassVar, Optional, Union, Callable, Sequence +from pathlib import Path +from functools import partial +from enum import Enum import numpy as np import parsl -import typeguard from ase.data import atomic_numbers -from parsl.app.app import join_app, python_app +from parsl import python_app, join_app, bash_app, File from parsl.dataflow.futures import AppFuture import psiflow from psiflow.data import Computable, Dataset +from psiflow.data.utils import extract_quantities from psiflow.geometry import Geometry, NullState -from psiflow.utils.apps import copy_app_future, unpack_i +from psiflow.utils.apps import copy_app_future, setup_logger +from psiflow.utils.parse import LineNotFoundError -logger = logging.getLogger(__name__) # logging per module +logger = setup_logger(__name__) # logging per module -@typeguard.typechecked -def _extract_energy(state: Geometry): - if state.energy is None: - return 1e10 - else: - return state.energy +class Status(Enum): + SUCCESS = 0 + FAILED = 1 + INCONSISTENT = 2 -extract_energy = python_app(_extract_energy, executors=["default_threads"]) +class SinglePointResult: + """All dict keys update_geometry understands""" -@join_app -@typeguard.typechecked -def get_minimum_energy(element, configs, *energies): - logger.info("atomic energies for element {}:".format(element)) - for config, energy in zip(configs, energies): - logger.info("\t{} eV; ".format(energy) + str(config)) - energy = min(energies) - assert not energy == 1e10, "atomic energy calculation of {} failed".format(element) - return copy_app_future(energy) + status: Status + natoms: int # optional + positions: np.ndarray + energy: float + forces: np.ndarray # optional + stdout: Path + stderr: Path # optional + runtime: float # optional -@typeguard.typechecked -def _nan_if_unsuccessful( - geometry: Geometry, - result: Geometry, -) -> Geometry: - if result == NullState: - geometry.energy = None - geometry.per_atom.forces[:] = np.nan - geometry.per_atom.stress = None - geometry.stdout = result.stdout - return geometry - else: - return result +def update_geometry(geom: Geometry, data: dict) -> Geometry: + """""" + _, task_id, task_name = data["stdout"].stem.split("_", maxsplit=2) + logger.info(f'Task "{task_name}" (ID {task_id}): {data["status"].name}') + geom = geom.copy() + geom.reset() + geom.order["status"], geom.order["task_id"] = data["status"].name, task_id + if data["status"] != Status.SUCCESS: + return geom + geom.order["runtime"] = data.get("runtime") -nan_if_unsuccessful = python_app(_nan_if_unsuccessful, executors=["default_threads"]) + shift = data["positions"][0] - geom.per_atom.positions[0] + if not np.allclose(data["positions"], geom.per_atom.positions + shift, atol=1e-6): + # output does not match geometry up to a translation + geom.order["status"] = Status.INCONSISTENT + return geom + + geom.energy = data["energy"] + if "forces" in data: + geom.per_atom.forces[:] = data["forces"] + return geom @join_app -@typeguard.typechecked -def evaluate( - geometry: Geometry, - reference: Reference, -) -> AppFuture: - if geometry == NullState: - return copy_app_future(NullState) - else: - future = reference.app_pre( - geometry, - stdout=parsl.AUTO_LOGNAME, - stderr=parsl.AUTO_LOGNAME, - ) - result = reference.app_post( - geometry=geometry.copy(), - inputs=[future.stdout, future.stderr, future], - ) - return nan_if_unsuccessful(geometry, result) +def get_minimum_energy(element: str, **kwargs) -> AppFuture[float]: + energies = {m: state.energy or np.inf for m, state in kwargs.items()} + energy = min(energies.values()) + + logger.info(f"Atomic energies for element {element}") + for m, energy in energies.items(): + logger.info(f"\tMultiplicity {m}:{energy:>10.4f} eV") + assert not np.isinf(energy), f"Atomic energy calculation of '{element}' failed" + return copy_app_future(energy) @join_app -@typeguard.typechecked def compute_dataset( dataset: Dataset, length: int, reference: Reference, -) -> AppFuture: - from psiflow.data.utils import extract_quantities - +) -> list[AppFuture[Geometry]]: + logger.info(f"Performing {length} {reference.__class__.__name__} calculations.") geometries = dataset.geometries() # read it once - evaluated = [evaluate(unpack_i(geometries, i), reference) for i in range(length)] - future = extract_quantities( - tuple(reference.outputs), - None, - None, - *evaluated, + evaluated = [reference.evaluate(geometries[i]) for i in range(length)] + return evaluated + + +def _execute( + reference: Reference, + inputs: list[File], + parsl_resource_specification: Optional[dict] = None, + stdout: str = parsl.AUTO_LOGNAME, + stderr: str = parsl.AUTO_LOGNAME, + label: str = "singlepoint", +) -> str: + return reference.get_shell_command(inputs) + + +def _process_output( + reference: Reference, + geom: Geometry, + inputs: tuple[str | int] = (), +) -> Geometry: + """""" + stdout = Path(inputs[0]).read_text() + try: + data = reference.parse_output(stdout) + except LineNotFoundError: + # TODO: find out what went wrong + data = {"status": Status.FAILED} + data |= {"stdout": Path(inputs[0]), "stderr": Path(inputs[1])} + return update_geometry(geom, data) + + +@join_app +def evaluate(reference: Reference, geom: Geometry) -> AppFuture[Geometry]: + """""" + if geom == NullState: + warnings.warn("Skipping NullState..") + return copy_app_future(geom) + execute, *files = reference.app_pre(geom=geom) + if not execute: # TODO: should we reset geom? + return copy_app_future(geom) + future = reference.app_execute(inputs=files) + future = reference.app_post( + geom=geom, inputs=[future.stdout, future.stderr, future] # wait for future ) return future -@typeguard.typechecked @psiflow.serializable class Reference(Computable): - outputs: tuple - batch_size: ClassVar[int] = 1 # not really used + outputs: Union[list[str], tuple[str, ...]] + batch_size: ClassVar[int] = 1 # TODO: not really used + executor: str + app_pre: ClassVar[Callable] # TODO: fix serialisation + app_execute: ClassVar[Callable] + app_post: ClassVar[Callable] + _execute_label: ClassVar[str] + execute_command: str def compute( self, arg: Union[Dataset, Geometry, AppFuture, list], *outputs: Optional[Union[str, tuple]], ): + for output in outputs: + if output not in self.outputs: + raise ValueError("output {} not in {}".format(output, self.outputs)) + + # TODO: convert_to_dataset util? if isinstance(arg, Dataset): dataset = arg elif isinstance(arg, list): dataset = Dataset(arg) elif isinstance(arg, AppFuture) or isinstance(arg, Geometry): dataset = Dataset([arg]) - compute_outputs = compute_dataset(dataset, dataset.length(), self) - if len(outputs) == 0: - outputs_ = tuple(self.outputs) - else: - outputs_ = outputs - to_return = [] - for output in outputs_: - if output not in self.outputs: - raise ValueError("output {} not in {}".format(output, self.outputs)) - index = self.outputs.index(output) - to_return.append(compute_outputs[index]) - if len(outputs_) == 1: - return to_return[0] else: - return to_return + raise TypeError + + # TODO: writing to dataset first is wasted overhead, + # but extract_quantities does not accept AppFuture[list[Geometry]] (yet?) + future_dataset = self.compute_dataset(dataset) + outputs = outputs or self.outputs + future_data = extract_quantities( + tuple(outputs), None, None, inputs=[future_dataset.extxyz] + ) + if len(outputs) == 1: + return future_data[0] + return [future_data[_] for _ in range(len(outputs))] + + def compute_dataset(self, dataset: Dataset) -> Dataset: + future = compute_dataset(dataset, dataset.length(), self) + return Dataset(future) + + def _create_apps(self): + definition = psiflow.context().definitions[self.executor] + self.execute_command = definition.command() + wq_resources = definition.wq_resources() + self.app_pre = self.create_input + self.app_execute = partial( + bash_app(_execute, executors=[self.executor]), + reference=self, + parsl_resource_specification=wq_resources, + label=self._execute_label, + ) + self.app_post = partial( + python_app(_process_output, executors=["default_threads"]), + reference=self, + ) + + def evaluate(self, geometry: Geometry | AppFuture) -> AppFuture: + return evaluate(self, geometry) - def compute_atomic_energy(self, element, box_size=None): - energies = [] + def compute_atomic_energy(self, element, box_size=None) -> AppFuture[float]: references = self.get_single_atom_references(element) - configs = [c for c, _ in references] - if box_size is not None: - state = Geometry.from_data( - numbers=np.array([atomic_numbers[element]]), - positions=np.array([[0, 0, 0]]), - cell=np.eye(3) * box_size, - ) - else: - state = Geometry( - numbers=np.array([atomic_numbers[element]]), - positions=np.array([[0, 0, 0]]), - cell=np.zeros((3, 3)), - ) - for _, reference in references: - energies.append(extract_energy(evaluate(state, reference))) - return get_minimum_energy(element, configs, *energies) - - def get_single_atom_references(self, element): - return [(None, self)] + state = Geometry.from_data( + numbers=np.array([atomic_numbers[element]]), + positions=np.array([[0, 0, 0]]), + cell=np.eye(3) * (box_size or 0), + ) + futures = {} + for mult, reference in references.items(): + futures[str(mult)] = reference.evaluate(state) + return get_minimum_energy(element, **futures) + + def get_single_atom_references(self, element: str) -> dict[int, Reference]: + raise NotImplementedError + + def get_shell_command(self, inputs: list[File]) -> str: + raise NotImplementedError + + def parse_output(self, stdout: str): + raise NotImplementedError + + def create_input(self, geom: Geometry) -> tuple[bool, File, ...]: + raise NotImplementedError + + +def get_spin_multiplicities(element: str) -> list[int]: + """TODO: rethink this""" + # max S = N * 1/2, max mult = 2 * S + 1 + from ase.symbols import atomic_numbers + + mults = [] + number = atomic_numbers[element] + for mult in range(1, min(number + 2, 16)): + if number % 2 == 0 and mult % 2 == 0: + continue # S always whole, mult never even + mults.append(mult) + return mults diff --git a/psiflow/serialization.py b/psiflow/serialization.py index 882f153e..cf863259 100644 --- a/psiflow/serialization.py +++ b/psiflow/serialization.py @@ -269,7 +269,7 @@ def deserialize(data_str: str, custom_cls: Optional[list] = None): from psiflow.metrics import Metrics from psiflow.models import MACE from psiflow.order_parameters import OrderParameter - from psiflow.reference import CP2K, D3, GPAW + from psiflow.reference import CP2K, GPAW, ORCA from psiflow.sampling import Metadynamics, ReplicaExchange, SimulationOutput, Walker SERIALIZABLES = {} @@ -280,7 +280,7 @@ def deserialize(data_str: str, custom_cls: Optional[list] = None): MACE, CP2K, GPAW, - D3, + ORCA, Zero, MACEHamiltonian, EinsteinCrystal, diff --git a/psiflow/utils/apps.py b/psiflow/utils/apps.py index 74c9c844..0f52bdd4 100644 --- a/psiflow/utils/apps.py +++ b/psiflow/utils/apps.py @@ -33,7 +33,7 @@ def _multiply(a, b): @typeguard.typechecked -def setup_logger(module_name): +def setup_logger(module_name: str, level=logging.INFO) -> logging.Logger: # Create logger instance for the module module_logger = logging.getLogger(module_name) @@ -48,7 +48,7 @@ def setup_logger(module_name): module_logger.addHandler(stdout_handler) # Set the logging level for the logger - module_logger.setLevel(logging.INFO) + module_logger.setLevel(level) return module_logger @@ -113,7 +113,7 @@ def _log_message(logger, message, *futures): def _pack(*args): - return args + return args # TODO: _combine_futures? pack = python_app(_pack, executors=["default_threads"]) diff --git a/psiflow/utils/io.py b/psiflow/utils/io.py index 1e0f4584..80dc475a 100644 --- a/psiflow/utils/io.py +++ b/psiflow/utils/io.py @@ -82,6 +82,7 @@ def _save_txt(data: str, outputs: list[File] = []) -> None: @typeguard.typechecked def _load_metrics(inputs: list = []) -> np.recarray: + # TODO: stop using recarrays return np.load(inputs[0], allow_pickle=True) @@ -90,6 +91,7 @@ def _load_metrics(inputs: list = []) -> np.recarray: @typeguard.typechecked def _save_metrics(data: np.recarray, outputs: list = []) -> None: + # TODO: stop using recarrays with open(outputs[0], "wb") as f: data.dump(f) diff --git a/psiflow/utils/parse.py b/psiflow/utils/parse.py new file mode 100644 index 00000000..f60a77a5 --- /dev/null +++ b/psiflow/utils/parse.py @@ -0,0 +1,58 @@ +import datetime +from pathlib import Path + +import numpy as np + +from psiflow.execution import PSIFLOW_INTERNAL + + +class LineNotFoundError(Exception): + """Call to find_line failed""" + + pass + + +def find_line( + lines: list[str], + line: str, + idx_start: int = 0, + max_lines: int = int(1e6), + reverse: bool = False, +) -> int: + """""" + if not reverse: + idx_slice = slice(idx_start, idx_start + max_lines) + else: + idx_start = idx_start or len(lines) - 1 + idx_slice = slice(idx_start, idx_start - max_lines, -1) + for i, l in enumerate(lines[idx_slice]): + if l.strip().startswith(line): + if not reverse: + return idx_start + i + else: + return idx_start - i + raise LineNotFoundError('Could not find line starting with "{}".'.format(line)) + + +def lines_to_array( + lines: list[str], start: int = 0, stop: int = int(1e6), dtype: np.dtype = float +) -> np.ndarray: + """""" + return np.array([line.split()[start:stop] for line in lines], dtype=dtype) + + +def string_to_timedelta(timedelta: str) -> datetime.timedelta: + """""" + allowed_units = "weeks", "days", "hours", "minutes", "seconds" + time_list = timedelta.split() + values, units = time_list[:-1:2], time_list[1::2] + kwargs = {u: float(v) for u, v in zip(units, values) if u in allowed_units} + return datetime.timedelta(**kwargs) + + +def get_task_logs(task_id: int) -> tuple[Path, Path]: + """""" + path = Path.cwd().resolve() / PSIFLOW_INTERNAL / "000/task_logs" # TODO + stdout = next(path.rglob(f"task_{task_id}_*.stdout")) + stderr = next(path.rglob(f"task_{task_id}_*.stderr")) + return stdout, stderr diff --git a/pyproject.toml b/pyproject.toml index b8c00b55..4d7092f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,8 +75,9 @@ log_cli = 0 addopts = [ "--basetemp=pytest-tmp", # /tmp/ may be different for each worker! "--import-mode=append", - "--psiflow-config=configs/threadpool.yaml", + "--psiflow-config=configs/local_test.yaml", "-W ignore::DeprecationWarning", "--log-cli-level=WARNING", + "--show-capture=stdout" ] testpaths = ["tests"] diff --git a/tests/conftest.py b/tests/conftest.py index 97ad51eb..5ed9039b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -128,9 +128,7 @@ def dataset_h2(context): ) data = [h2.copy() for i in range(20)] for atoms in data: - atoms.set_positions( - atoms.get_positions() + np.random.uniform(-0.05, 0.05, size=(2, 3)) - ) + atoms.positions += np.random.uniform(-0.05, 0.05, size=(2, 3)) return Dataset([Geometry.from_atoms(a) for a in data]) diff --git a/tests/test_function.py b/tests/test_function.py index 8ff9da13..e9bcacf3 100644 --- a/tests/test_function.py +++ b/tests/test_function.py @@ -1,8 +1,8 @@ import json import numpy as np -from ase.units import kJ, mol # type: ignore -from parsl.data_provider.files import File # type: ignore +from ase.units import kJ, mol # type: ignore +from parsl.data_provider.files import File # type: ignore import psiflow from psiflow.data import Dataset @@ -11,6 +11,7 @@ HarmonicFunction, MACEFunction, PlumedFunction, + DispersionFunction, function_from_json, ) from psiflow.hamiltonians import ( @@ -18,6 +19,7 @@ Harmonic, MixtureHamiltonian, PlumedHamiltonian, + D3Hamiltonian, Zero, ) from psiflow.utils._plumed import remove_comments_printflush, set_path_in_plumed @@ -214,6 +216,20 @@ def test_harmonic_function(dataset): assert np.allclose(forces.result(), forces_) +def test_dispersion_function(dataset): + hamiltonian = D3Hamiltonian(method="pbe", damping="d3bj") + energy = hamiltonian.compute(dataset[-1], "energy").result() + assert energy is not None + assert energy < 0.0 # dispersion is attractive + + subset = dataset[:3] + data = subset.evaluate(hamiltonian) + energy, forces = hamiltonian.compute(subset, "energy", "forces") + + assert np.allclose(data.get("energy").result(), energy.result()) + assert len(forces.result().shape) == 3 + + def test_hamiltonian_arithmetic(dataset): hamiltonian = EinsteinCrystal(dataset[0], force_constant=1.0) hamiltonian_ = EinsteinCrystal(dataset[0].result(), force_constant=1.1) @@ -257,9 +273,9 @@ def test_hamiltonian_arithmetic(dataset): geometries = [dataset[i].result() for i in [0, -1]] natoms = [len(geometry) for geometry in geometries] - forces = zero.compute(geometries, 'forces', batch_size=1).result() - assert np.all(forces[0, :natoms[0]] == 0.0) - assert np.all(forces[-1, :natoms[1]] == 0.0) + forces = zero.compute(geometries, "forces", batch_size=1).result() + assert np.all(forces[0, : natoms[0]] == 0.0) + assert np.all(forces[-1, : natoms[1]] == 0.0) def test_subtract(dataset): @@ -424,4 +440,3 @@ def test_function_from_json(tmp_path, dataset): energies = hamiltonian.compute(dataset, "energy").result() energies_ = function.compute(dataset.geometries().result())["energy"] assert np.allclose(energies, energies_) - diff --git a/tests/test_learning.py b/tests/test_learning.py index b45d53b6..e450a978 100644 --- a/tests/test_learning.py +++ b/tests/test_learning.py @@ -7,7 +7,7 @@ from psiflow.hamiltonians import EinsteinCrystal from psiflow.learning import Learning, evaluate_outputs from psiflow.metrics import Metrics, _create_table, parse_walker_log, reconstruct_dtypes -from psiflow.reference import D3 +from psiflow.reference import ReferenceDummy from psiflow.sampling import SimulationOutput, Walker from psiflow.utils.apps import combine_futures from psiflow.utils.io import _load_metrics, _save_metrics, load_metrics, save_metrics @@ -162,7 +162,7 @@ def test_evaluate_outputs(dataset): identifier, data, resets = evaluate_outputs( outputs, einstein, - D3(method="pbe", damping="d3bj"), + ReferenceDummy(), identifier=identifier, error_thresholds_for_reset=[None, None], # never reset error_thresholds_for_discard=[None, None], @@ -177,7 +177,7 @@ def test_evaluate_outputs(dataset): identifier, data, resets = evaluate_outputs( outputs, einstein, - D3(method="pbe", damping="d3bj"), + ReferenceDummy(), identifier=identifier, error_thresholds_for_reset=[0.0, 0.0], error_thresholds_for_discard=[0.0, 0.0], @@ -224,7 +224,7 @@ def test_wandb(): def test_learning_workflow(tmp_path, gpu, mace_model, dataset): learning = Learning( - D3(method="pbe", damping="d3bj"), + ReferenceDummy(), tmp_path / "output", error_thresholds_for_reset=[None, None], error_thresholds_for_discard=[None, None], diff --git a/tests/test_reference.py b/tests/test_reference.py index 2e1c1624..8fa5b09e 100644 --- a/tests/test_reference.py +++ b/tests/test_reference.py @@ -1,5 +1,3 @@ -from pathlib import Path - import numpy as np import pytest from ase.units import Bohr, Ha @@ -8,15 +6,21 @@ import psiflow from psiflow.data import Dataset from psiflow.geometry import Geometry, NullState -from psiflow.reference import CP2K, D3, GPAW, evaluate -from psiflow.reference._cp2k import dict_to_str, parse_cp2k_output, str_to_dict +from psiflow.reference import CP2K, GPAW, ORCA, create_orca_input +from psiflow.reference.reference import Status +from psiflow.reference.cp2k_ import ( + dict_to_str, + str_to_dict, + parse_output as parse_cp2k_output, +) +from psiflow.utils.parse import get_task_logs @pytest.fixture -def simple_cp2k_input(): +def simple_cp2k_input() -> str: return """ &GLOBAL - PRINT_LEVEL LOW + PRINT_LEVEL MEDIUM &END GLOBAL &FORCE_EVAL METHOD Quickstep @@ -63,6 +67,13 @@ def simple_cp2k_input(): """ +@pytest.fixture +def geom_h2_p() -> Geometry: + # periodic H2 at ~optimized interatomic distance + pos = np.array([[0, 0, 0], [0.74, 0, 0]]) + return Geometry.from_data(numbers=np.ones(2), positions=pos, cell=5 * np.eye(3)) + + def test_cp2k_check_input(simple_cp2k_input): cp2k_input = str_to_dict(simple_cp2k_input) assert cp2k_input["force_eval"]["method"] == "quickstep" @@ -80,13 +91,20 @@ def test_cp2k_check_input(simple_cp2k_input): def test_cp2k_parse_output(): cp2k_output_str = """ +### CP2K OUTPUT SNIPPETS ### + + TOTAL NUMBERS AND MAXIMUM NUMBERS + + Total number of - Atomic kinds: 1 + - Atoms: 1 + +### SKIPPED A BIT ### + MODULE QUICKSTEP: ATOMIC COORDINATES IN ANGSTROM Atom Kind Element X Y Z Z(eff) Mass 1 1 O 8 0.000000 0.000000 0.000000 6.0000 15.9994 - - SCF PARAMETERS Density guess: RESTART -------------------------------------------------------- max_scf: 20 @@ -97,8 +115,8 @@ def test_cp2k_parse_output(): eps_scf_history: 0.00E+00 eps_diis: 1.00E-01 eps_eigval: 1.00E-05 - 18 OT LS 0.38E+00 0.1 -14.2029934070 - 19 OT CG 0.38E+00 0.2 0.00000062 -14.2029934070 -3.99E-11 + +### SKIPPED A BIT ### *** SCF run converged in 19 steps *** @@ -123,24 +141,8 @@ def test_cp2k_parse_output(): Integrated absolute spin density : 5.9999999997 Ideal and single determinant S**2 : 12.000000 12.000000 - - !-----------------------------------------------------------------------------! - Mulliken Population Analysis - - # Atom Element Kind Atomic population (alpha,beta) Net charge Spin moment - 1 O 1 6.000000 0.000000 0.000000 6.000000 - # Total charge and spin 6.000000 0.000000 0.000000 6.000000 - - !-----------------------------------------------------------------------------! - - !-----------------------------------------------------------------------------! - Hirshfeld Charges - - #Atom Element Kind Ref Charge Population Spin moment Net charge - 1 O 1 6.000 5.984 0.000 5.984 0.016 - - Total Charge 0.016 - !-----------------------------------------------------------------------------! + +### SKIPPED A BIT ### ENERGY| Total FORCE_EVAL ( QS ) energy [a.u.]: -14.202993407031412 @@ -164,91 +166,70 @@ def test_cp2k_parse_output(): STRESS| x 0.577844347143 0.756273914341 -0.306831675291 STRESS| y -0.577518702860 0.644541601612 0.501037195863 STRESS| z 0.576687140764 -0.112320480227 0.809207051007 + +### SKIPPED A BIT ### + + ------------------------------------------------------------------------------- + - - + - T I M I N G - + - - + ------------------------------------------------------------------------------- + SUBROUTINE CALLS ASD SELF TIME TOTAL TIME + MAXIMUM AVERAGE MAXIMUM AVERAGE MAXIMUM + CP2K 1 1.0 0.036 0.042 520.335 520.336 + qs_forces 1 2.0 0.001 0.001 519.254 519.255 + qs_energies 1 3.0 0.005 0.006 507.372 507.376 + +### SKIPPED A BIT ### + + **** **** ****** ** PROGRAM ENDED AT 2025-07-31 13:45:43.191 + ***** ** *** *** ** PROGRAM RAN ON node3594.doduo.os + ** **** ****** PROGRAM RAN BY + ***** ** ** ** ** PROGRAM PROCESS ID 1183304 + **** ** ******* ** PROGRAM STOPPED IN /tmp/mytmpdir.2RsSPreAda """ - geometry = Geometry.from_data( - numbers=np.array([8]), - positions=np.zeros((1, 3)), - cell=np.zeros((3, 3)), - ) - geometry = parse_cp2k_output(cp2k_output_str, ("energy", "forces"), geometry) - assert geometry.energy == -14.202993407031412 * Ha + data_out = parse_cp2k_output(cp2k_output_str, ("energy", "forces")) + assert data_out["energy"] == -14.202993407031412 * Ha + assert data_out["runtime"] == 520.336 + assert data_out["status"] == Status.SUCCESS -def test_reference_d3(context, dataset, tmp_path): - reference = D3(method="pbe", damping="d3bj") - state = evaluate(dataset[-1], reference).result() - assert state.energy is not None - assert state.energy < 0.0 # dispersion is attractive - - subset = dataset[:3] - data = subset.evaluate(reference) - energy = reference.compute(subset, "energy") - forces = reference.compute(subset, "forces") - - assert np.allclose( - data.get("energy").result(), - energy.result(), - ) - assert np.allclose( - reference.compute_atomic_energy("H").result(), - 0.0, - ) - - assert len(forces.result().shape) == 3 - @pytest.mark.filterwarnings("ignore:Original input file not found") -def test_cp2k_success(context, simple_cp2k_input): +def test_cp2k_success(simple_cp2k_input, geom_h2_p): reference = CP2K(simple_cp2k_input) - geometry = Geometry.from_data( # simple H2 at ~optimized interatomic distance - numbers=np.ones(2), - positions=np.array([[0, 0, 0], [0.74, 0, 0]]), - cell=5 * np.eye(3), - ) - - evaluated = evaluate(geometry, reference) - assert isinstance(evaluated, AppFuture) - geometry = evaluated.result() - assert geometry != NullState - - assert Path(geometry.stdout).is_file() - assert geometry.energy is not None - assert not np.any(np.isnan(geometry.per_atom.forces)) - assert np.allclose( - -1.167407360449355 * Ha, - geometry.energy, - ) - forces_reference = np.array([[-0.00968014, 0.0, 0.0], [0.00967947, 0.0, 0.0]]) - forces_reference *= Ha - forces_reference /= Bohr - assert np.allclose( - forces_reference, - geometry.per_atom.forces, - atol=1e-5, - ) - - # check whether NullState evaluates to NullState - state = evaluate(NullState, reference) - assert state.result() == NullState + future = reference.evaluate(geom_h2_p) + future_null = reference.evaluate(NullState) + assert isinstance(future, AppFuture) + geom_out = future.result() + geom_null = future_null.result() + + ref_energy = -1.167407360449355 * Ha + ref_forces = np.array([[-0.00968014, 0.0, 0.0], [0.00967947, 0.0, 0.0]]) * Ha / Bohr + assert geom_out != NullState + assert geom_out.energy is not None + assert not np.any(np.isnan(geom_out.per_atom.forces)) + assert np.allclose(ref_energy, geom_out.energy) + assert np.allclose(ref_forces, geom_out.per_atom.forces, atol=1e-5) + assert geom_null == NullState # check whether NullState evaluates to NullState # check number of mpi processes - with open(geometry.stdout, "r") as f: - content = f.read() - definition = psiflow.context().definitions["CP2K"] - ncores = definition.cores_per_worker - lines = content.split("\n") + stdout, _ = get_task_logs(geom_out.order["task_id"]) + lines = stdout.read_text().split("\n") for line in lines: if "Total number of message passing processes" in line: nprocesses = int(line.split()[-1]) if "Number of threads for this process" in line: nthreads = int(line.split()[-1]) + definition = psiflow.context().definitions["CP2K"] + ncores = definition.cores_per_worker assert ncores == nprocesses assert 1 == nthreads @pytest.mark.filterwarnings("ignore:Original input file not found") -def test_cp2k_failure(context, tmp_path): +def test_cp2k_failure(geom_h2_p): cp2k_input = """ &FORCE_EVAL METHOD Quickstep @@ -315,22 +296,17 @@ def test_cp2k_failure(context, tmp_path): &END FORCE_EVAL """ # incorrect input file reference = CP2K(cp2k_input) - geometry = Geometry.from_data( # simple H2 at ~optimized interatomic distance - numbers=np.ones(2), - positions=np.array([[0, 0, 0], [0.74, 0, 0]]), - cell=5 * np.eye(3), - ) - evaluated = evaluate(geometry, reference) - assert isinstance(evaluated, AppFuture) - state = evaluated.result() - assert state.energy is None - assert np.all(np.isnan(state.per_atom.forces)) - with open(state.stdout, "r") as f: - log = f.read() - assert "ABORT" in log # verify error is captured + future = reference.evaluate(geom_h2_p) + assert isinstance(future, AppFuture) + geom_out = future.result() + assert geom_out.energy is None + assert np.all(np.isnan(geom_out.per_atom.forces)) + stdout, _ = get_task_logs(geom_out.order["task_id"]) + assert "ABORT" in stdout.read_text() # verify error is captured -def test_cp2k_memory(context, simple_cp2k_input): +def test_cp2k_memory(simple_cp2k_input): + # TODO: test_cp2k_memory == test_cp2k_timeout until memory constraints work reference = CP2K(simple_cp2k_input) geometry = Geometry.from_data( numbers=np.ones(4000), @@ -343,60 +319,45 @@ def test_cp2k_memory(context, simple_cp2k_input): @pytest.mark.filterwarnings("ignore:Original input file not found") -def test_cp2k_timeout(context, simple_cp2k_input): +def test_cp2k_timeout(simple_cp2k_input, geom_h2_p): reference = CP2K(simple_cp2k_input) - geometry = Geometry.from_data( # simple H2 at ~optimized interatomic distance - numbers=np.ones(2), - positions=np.array([[0, 0, 0], [3, 0, 0]]), - cell=20 * np.eye(3), # box way too large - ) - energy, forces = reference.compute(Dataset([geometry])) + geom_h2_p.cell = 20 * np.eye(3) # box way too large + energy, forces = reference.compute(Dataset([geom_h2_p])) energy, forces = energy.result(), forces.result() assert np.all(np.isnan(energy)) - print(energy.shape, forces.shape) -def test_cp2k_energy(context, simple_cp2k_input): +def test_cp2k_energy(simple_cp2k_input, geom_h2_p): reference = CP2K(simple_cp2k_input, outputs=("energy",)) - geometry = Geometry.from_data( # simple H2 at ~optimized interatomic distance - numbers=np.ones(2), - positions=np.array([[0, 0, 0], [3, 0, 0]]), - cell=5 * np.eye(3), # box way too large - ) - state = evaluate(geometry, reference).result() - assert state.energy is not None - assert state.stdout is not None - assert np.all(np.isnan(state.per_atom.forces)) + geom_out = reference.evaluate(geom_h2_p).result() + assert geom_out.energy is not None + assert np.all(np.isnan(geom_out.per_atom.forces)) @pytest.mark.filterwarnings("ignore:Original input file not found") -def test_cp2k_atomic_energies( - dataset, simple_cp2k_input -): # use energy-only because why not - reference = CP2K(simple_cp2k_input, outputs=("energy",), executor="CP2K_container") - element = "H" - energy = reference.compute_atomic_energy(element, box_size=4) +def test_cp2k_atomic_energies(simple_cp2k_input): + # use energy-only because why not + reference = CP2K(simple_cp2k_input, outputs=("energy",)) + energy = reference.compute_atomic_energy("H", box_size=4) assert abs(energy.result() - (-13.6)) < 1 # reasonably close to exact value -def test_cp2k_serialize(dataset, simple_cp2k_input): +def test_cp2k_serialize(simple_cp2k_input): element = "H" reference = CP2K(simple_cp2k_input, outputs=("energy",)) assert "outputs" in reference._attrs - assert "cp2k_input_dict" in reference._attrs - assert "cp2k_input_str" in reference._attrs - energy = reference.compute_atomic_energy(element, box_size=4) + assert "input_dict" in reference._attrs data = psiflow.serialize(reference).result() - reference = psiflow.deserialize(data) - assert type(reference.outputs) is list - assert np.allclose( - energy.result(), - reference.compute_atomic_energy(element, box_size=4).result(), - ) + reference2 = psiflow.deserialize(data) + future = reference.compute_atomic_energy(element, box_size=4) + future2 = reference2.compute_atomic_energy(element, box_size=4) + assert type(reference2.outputs) is list + assert np.allclose(future.result(), future2.result()) -def test_cp2k_posthf(context): + +def test_cp2k_posthf(geom_h2_p): cp2k_input_str = """ &FORCE_EVAL METHOD Quickstep @@ -425,40 +386,43 @@ def test_cp2k_posthf(context): &END FORCE_EVAL """ reference = CP2K(cp2k_input_str, outputs=("energy",)) - geometry = Geometry.from_data( # simple H2 at ~optimized interatomic distance - numbers=np.ones(2), - positions=np.array([[0, 0, 0], [0.74, 0, 0]]), - cell=5 * np.eye(3), - ) - assert evaluate(geometry, reference).result().energy is not None - - -def test_gpaw_single(dataset, dataset_h2): - gpaw = GPAW( - mode="fd", - nbands=0, - xc="LDA", - h=0.1, - minimal_box_multiple=2, - ) - state = evaluate(dataset_h2[0], gpaw).result() - assert state.energy is not None - assert state.energy < 0.0 - assert np.allclose( - state.per_atom.positions, - dataset_h2[0].result().per_atom.positions, - ) - gpaw = GPAW( - mode="fd", - nbands=0, - xc="LDA", - h=0.1, - minimal_box_multiple=2, - executor="GPAW_container", - ) - energy = gpaw.compute(dataset_h2[:1])[0].result() - assert np.allclose(state.energy, energy) - assert gpaw.compute_atomic_energy("Zr", box_size=9).result() == 0.0 - - gpaw = GPAW(askdfj="asdfk") # invalid input - assert evaluate(dataset_h2[1], gpaw).result().energy is None + assert reference.evaluate(geom_h2_p).result().energy is not None + + +def test_gpaw_single(dataset_h2): + parameters = dict(mode="fd", nbands=0, xc="LDA", h=0.3, minimal_box_multiple=2) + gpaw = GPAW(parameters) + future_in = dataset_h2[0] + future_out = gpaw.evaluate(future_in) + future_energy = gpaw.compute(dataset_h2[:1])[0] + future_energy_zr = gpaw.compute_atomic_energy("Zr", box_size=9) + gpaw = GPAW({"askdfj": "asdfk"}) # invalid input + future_fail = gpaw.evaluate(future_in) + + geom_in, geom_out = future_in.result(), future_out.result() + energy, energy_zr = future_energy.result(), future_energy_zr.result() + + assert geom_out.energy is not None and geom_out.energy < 0.0 + assert np.allclose(geom_out.per_atom.positions, geom_in.per_atom.positions) + assert np.allclose(geom_out.energy, energy) + assert energy_zr == 0.0 + assert future_fail.result().energy is None + + +# TODO: enable once we have an ORCA container +# def test_orca_single(dataset_h2): +# input_str = create_orca_input() +# orca = ORCA(input_str) +# future_in = dataset_h2[0] +# future_out = orca.evaluate(future_in) +# future_energy = orca.compute(dataset_h2[:1])[0] +# future_energy_h = orca.compute_atomic_energy("H") +# +# geom_in, geom_out = future_in.result(), future_out.result() +# energy, energy_h = future_energy.result(), future_energy_h.result() +# +# assert geom_out.energy is not None and geom_out.energy < 0.0 +# assert np.allclose(geom_out.per_atom.positions, geom_in.per_atom.positions) +# assert np.allclose(geom_out.energy, energy) +# print(energy, energy_h) +# assert np.isclose(energy_h, -13.6)