Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions psiflow/free_energy/phonons.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
from psiflow.hamiltonians import Hamiltonian, MixtureHamiltonian
from psiflow.sampling.sampling import (
setup_sockets,
label_forces,
make_force_xml,
serialize_mixture,
make_start_command,
make_client_command
)
Expand Down Expand Up @@ -139,7 +137,7 @@ def compute_harmonic(
energy_shift: float = 0.00095,
) -> AppFuture:
hamiltonian: MixtureHamiltonian = 1 * hamiltonian
names = label_forces(hamiltonian)
names = hamiltonian.get_named_components()
sockets = setup_sockets(names)
forces = make_force_xml(hamiltonian, names)

Expand Down Expand Up @@ -175,7 +173,7 @@ def compute_harmonic(
input_future,
Dataset([state]).extxyz,
]
inputs += serialize_mixture(hamiltonian, dtype="float64")
inputs += hamiltonian.serialize(dtype="float64")

client_args = []
for name in names:
Expand Down
140 changes: 75 additions & 65 deletions psiflow/hamiltonians.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import urllib
from functools import partial
from pathlib import Path
from typing import ClassVar, Optional, Union
from typing import ClassVar, Optional, Union, Callable

import numpy as np
import typeguard
Expand All @@ -28,17 +28,21 @@
from psiflow.utils.io import dump_json


# TODO: executors not very consistent for some reason (default_threads <> default_htex)

@typeguard.typechecked
@psiflow.serializable
class Hamiltonian(Computable):
outputs: ClassVar[tuple] = ("energy", "forces", "stress")
batch_size = 1000
app: ClassVar[Callable] # TODO: app is actually an instance variable, but serialization complains..
function_name: ClassVar[str]

def compute(
self,
arg: Union[Dataset, AppFuture[list], list, AppFuture, Geometry],
*outputs: Optional[str],
batch_size: Optional[int] = -1, # if -1: take class default
batch_size: Optional[int] = -1, # if -1: take class default TODO: why not just None?
) -> Union[list[AppFuture], AppFuture]:
if len(outputs) == 0:
outputs = tuple(self.__class__.outputs)
Expand All @@ -54,15 +58,14 @@ def compute(
def __eq__(self, hamiltonian: Hamiltonian) -> bool:
raise NotImplementedError

def __mul__(self, a: float) -> Hamiltonian:
def __mul__(self, a: float) -> MixtureHamiltonian:
return MixtureHamiltonian([self], [a])

def __add__(self, hamiltonian: Hamiltonian) -> Hamiltonian:
if type(hamiltonian) is Zero:
return self
if type(hamiltonian) is MixtureHamiltonian:
return hamiltonian.__add__(self)
return 1.0 * self + 1.0 * hamiltonian
mixture = MixtureHamiltonian([self], [1.])
return mixture.__add__(hamiltonian)

def __sub__(self, hamiltonian: Hamiltonian) -> Hamiltonian:
return self + (-1.0) * hamiltonian
Expand Down Expand Up @@ -101,9 +104,10 @@ def __mul__(self, a: float) -> Hamiltonian:
return Zero()

def __add__(self, hamiltonian: Hamiltonian) -> Hamiltonian:
return hamiltonian
return hamiltonian # (Zero + Hamiltonian) is different from (Hamiltonian + Zero)

__rmul__ = __mul__ # handle float * Zero

__rmul__ = __mul__ # handle float * Hamiltonian


@typeguard.typechecked
Expand Down Expand Up @@ -142,88 +146,90 @@ def compute( # override compute for efficient batching
batch_size=batch_size,
)

def __eq__(self, hamiltonian: Hamiltonian) -> bool:
def __eq__(self, hamiltonian: Hamiltonian | MixtureHamiltonian) -> bool:
if type(hamiltonian) is not MixtureHamiltonian:
return False
if len(self.coefficients) != len(hamiltonian.coefficients):
return False
for i, h in enumerate(self.hamiltonians):
if h not in hamiltonian.hamiltonians:
for c, h in zip(self.coefficients, self.hamiltonians):
if (idx := hamiltonian.get_index(h)) is None:
return False
coefficient = hamiltonian.coefficients[hamiltonian.hamiltonians.index(h)]
if self.coefficients[i] != coefficient:
if c != hamiltonian.coefficients[idx]:
return False
return True

def __mul__(self, a: float) -> Hamiltonian:
def __mul__(self, a: float) -> MixtureHamiltonian:
return MixtureHamiltonian(
self.hamiltonians,
[c * a for c in self.coefficients],
)

__rmul__ = __mul__ # handle float * MixtureHamiltonian

def __len__(self) -> int:
return len(self.coefficients)

def __add__(self, hamiltonian: Hamiltonian) -> Hamiltonian:
def __add__(self, hamiltonian: Hamiltonian | MixtureHamiltonian) -> MixtureHamiltonian:
if type(hamiltonian) is Zero:
return self
if type(hamiltonian) is not MixtureHamiltonian:
coefficients = list(self.coefficients)
hamiltonians = list(self.hamiltonians)
try:
index = hamiltonians.index(hamiltonian)
coefficients[index] += 1.0
except ValueError:
hamiltonians.append(hamiltonian)
coefficients.append(1.0)
else:
coefficients = list(hamiltonian.coefficients)
hamiltonians = list(hamiltonian.hamiltonians)
for c, h in zip(self.coefficients, self.hamiltonians):
try:
index = hamiltonians.index(h)
coefficients[index] += c
except ValueError:
hamiltonians.append(h)
coefficients.append(c)
hamiltonian = 1.0 * hamiltonian # turn into mixture

coefficients = list(hamiltonian.coefficients)
hamiltonians = list(hamiltonian.hamiltonians)
for c, h in zip(self.coefficients, self.hamiltonians):
if (idx := hamiltonian.get_index(h)) is None:
hamiltonians.append(h)
coefficients.append(c)
else:
coefficients[idx] += c
return MixtureHamiltonian(hamiltonians, coefficients)

def get_index(self, hamiltonian) -> Optional[int]:
def get_index(self, hamiltonian: Hamiltonian) -> Optional[int]:
assert type(hamiltonian) is not MixtureHamiltonian
if hamiltonian not in self.hamiltonians:
try:
return self.hamiltonians.index(hamiltonian)
except ValueError:
return None
return self.hamiltonians.index(hamiltonian)

def get_indices(self, mixture) -> Optional[tuple[int, ...]]:
def get_indices(self, mixture: MixtureHamiltonian) -> Optional[tuple[int, ...]]:
# TODO: why do we not just return None for the missing components?
assert type(mixture) is MixtureHamiltonian
for h in mixture.hamiltonians:
if h not in self.hamiltonians:
return None
indices = []
for h in mixture.hamiltonians:
indices.append(self.get_index(h))
if any([idx is None for idx in indices]):
return None
return tuple(indices)

def get_coefficient(self, hamiltonian) -> Optional[float]:
def get_coefficient(self, hamiltonian: Hamiltonian) -> Optional[float]:
assert type(hamiltonian) is not MixtureHamiltonian
if hamiltonian not in self.hamiltonians:
if (idx := self.get_index(hamiltonian)) is None:
return None
return self.coefficients[self.hamiltonians.index(hamiltonian)]
return self.coefficients[idx]

def get_coefficients(self, mixture) -> Optional[tuple[float, ...]]:
def get_coefficients(self, mixture: MixtureHamiltonian) -> Optional[tuple[float, ...]]:
assert type(mixture) is MixtureHamiltonian
for h in mixture.hamiltonians:
for h in mixture.hamiltonians: # every mixture component must be a component of self
if h not in self.hamiltonians:
return None
coefficients = []
for h in self.hamiltonians:
coefficient = mixture.get_coefficient(h)
if coefficient is None:
coefficient = 0.0
coefficients.append(coefficient)

# components of self that are not in mixture get coefficient 0
coefficients = [(mixture.get_coefficient(h) or 0) for h in self.hamiltonians]
return tuple(coefficients)

__rmul__ = __mul__ # handle float * Hamiltonian
def get_named_components(self) -> list[str]:
"""Create unique string name for every hamiltonian component to be used in i-Pi sampling"""
names, counts = [], {}
for h in self.hamiltonians:
name = h.__class__.__name__
counts.setdefault(name, 0)
names.append(f"{name}{counts[name]}")
counts[name] += 1
return names

def serialize(self, **kwargs) -> list[DataFuture]:
return [h.serialize_function(**kwargs) for h in self.hamiltonians]


@typeguard.typechecked
Expand Down Expand Up @@ -255,12 +261,12 @@ def parameters(self) -> dict:
"volume": get_attribute(self.reference_geometry, "volume"),
}

def __eq__(self, hamiltonian: Hamiltonian) -> bool:
if type(hamiltonian) is not EinsteinCrystal:
return False
if not np.allclose(self.force_constant, hamiltonian.force_constant):
return False
if self.reference_geometry != hamiltonian.reference_geometry:
def __eq__(self, hamiltonian: Hamiltonian | EinsteinCrystal) -> bool:
if (
not isinstance(hamiltonian, EinsteinCrystal) or
not np.allclose(self.force_constant, hamiltonian.force_constant) or
self.reference_geometry != hamiltonian.reference_geometry
):
return False
return True

Expand Down Expand Up @@ -302,10 +308,11 @@ def parameters(self) -> dict:
external = None
return {"plumed_input": self.plumed_input, "external": external}

def __eq__(self, other: Hamiltonian) -> bool:
if type(other) is not type(self):
return False
if self.plumed_input != other.plumed_input:
def __eq__(self, other: Hamiltonian | PlumedHamiltonian) -> bool:
if (
not isinstance(other, PlumedHamiltonian) or
self.plumed_input != other.plumed_input
):
return False
return True

Expand All @@ -322,7 +329,7 @@ def __init__(
reference_geometry: Union[Geometry, AppFuture[Geometry]],
hessian: Union[np.ndarray, AppFuture[np.ndarray]],
):
self.reference_geometry = reference_geometry
self.reference_geometry = reference_geometry # TODO: why not copy_app_future(geometry) like others?
self.hessian = hessian
self._create_apps()

Expand All @@ -343,12 +350,13 @@ def parameters(self) -> dict:
"hessian": self.hessian,
}

def __eq__(self, hamiltonian: Hamiltonian) -> bool:
def __eq__(self, hamiltonian: Hamiltonian | Harmonic) -> bool:
if type(hamiltonian) is not Harmonic:
return False
if hamiltonian.reference_geometry != self.reference_geometry:
return False

# TODO: why this check? Is it not always an ndarray?
# slightly different check for numpy arrays
is_array0 = type(hamiltonian.hessian) is np.ndarray
is_array1 = type(self.hessian) is np.ndarray
Expand Down Expand Up @@ -409,7 +417,7 @@ def parameters(self) -> dict:
"env_vars": evaluation.env_vars,
}

def __eq__(self, hamiltonian) -> bool:
def __eq__(self, hamiltonian: Hamiltonian | MACEHamiltonian) -> bool:
if type(hamiltonian) is not MACEHamiltonian:
return False
if self.external.filepath != hamiltonian.external.filepath:
Expand All @@ -424,6 +432,8 @@ def __eq__(self, hamiltonian) -> bool:
return False
return True

# TODO: the methods below are outdated..

@classmethod
def mace_mp0(cls, size: str = "small") -> MACEHamiltonian:
urls = dict(
Expand Down
5 changes: 2 additions & 3 deletions psiflow/sampling/ase.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from psiflow.geometry import Geometry
from psiflow.hamiltonians import Hamiltonian
from psiflow.utils.io import dump_json
from psiflow.sampling.sampling import serialize_mixture, label_forces
from psiflow.utils import TMP_COMMAND, CD_COMMAND

from ._ase import ALLOWED_MODES
Expand Down Expand Up @@ -79,8 +78,8 @@ def optimize(

input_geometry = Dataset([state]).extxyz
hamiltonian = 1.0 * hamiltonian # convert to mixture
names, coeffs = label_forces(hamiltonian), hamiltonian.coefficients
input_forces = serialize_mixture(hamiltonian, dtype="float64") # double precision for MLPs
names, coeffs = hamiltonian.get_named_components(), hamiltonian.coefficients
input_forces = hamiltonian.serialize(dtype="float64") # double precision for MLPs
forces = [
dict(forcefield=n, weight=str(c), file=f.filename) for n, c, f in zip(names, coeffs, input_forces)
]
Expand Down
3 changes: 1 addition & 2 deletions psiflow/sampling/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@

@typeguard.typechecked
def potential_component_names(n: int):
str_format = "pot_component_raw({})"
return [str_format.format(i) + "{electronvolt}" for i in range(n)]
return [f'pot_component_raw({i}){{electronvolt}}' for i in range(n)]


@typeguard.typechecked
Expand Down
Loading
Loading