diff --git a/src/pint/models/__init__.py b/src/pint/models/__init__.py index f22f45d38a..d13ceec8c9 100644 --- a/src/pint/models/__init__.py +++ b/src/pint/models/__init__.py @@ -34,7 +34,7 @@ FDJumpDM, ) from pint.models.dmwavex import DMWaveX -from pint.models.expdip import SimpleExponentialDip +from pint.models.transient_events import SimpleExponentialDip, ChromaticGaussianEvent from pint.models.fdjump import FDJump from pint.models.frequency_dependent import FD from pint.models.glitch import Glitch diff --git a/src/pint/models/expdip.py b/src/pint/models/expdip.py deleted file mode 100644 index 3082956b6b..0000000000 --- a/src/pint/models/expdip.py +++ /dev/null @@ -1,305 +0,0 @@ -"""Exponential dips due to, e.g., profile change events in J1713+0747""" - -from typing import List, Optional, Union -import numpy as np -import astropy.units as u -from astropy.time import Time -from pint.models.parameter import floatParameter, prefixParameter -from pint.models.timing_model import DelayComponent -from pint.toa import TOAs - - -class SimpleExponentialDip(DelayComponent): - """Simple chromatic exponential dip model for events like the profile changes - in J1713+0747. - - The dip is modeled as a logistic function multiplied by an exponential decay. - This is different from the tempo2 implementation where the exponential decay - is multiplied by a Heaviside step function. We cannot fit for the event epoch - while using the latter because it is not differentiable at the event epoch. - This implementation approaches the tempo2 version in the limit EXPDIPEPS -> 0. - The parameter names are also different between this implementation and tempo2. - The tempo2 parameter names have been added here as aliases so that tempo2 par files - can be read. - - The explicit mathematical form of the dips is as follows. - - .. math:: - \\Delta_{\\text{dip}}(t)=-\\sum_{i}A_{i}\\left(\\frac{f}{f_{\\text{ref}}}\\right)^{\\gamma_{i}}\\left(\\frac{\\tau_{i}}{\\epsilon}\\right)^{\\epsilon/\\tau_{i}}\\left(\\frac{\\tau_{i}}{\\tau_{i}-\\epsilon}\\right)^{\\frac{\\tau_{i}-\\epsilon}{\\tau_{i}}}\\frac{\\exp\\left[-\\frac{t-T_{i}}{\\tau_{i}}\\right]}{1+\\exp\\left[-\\frac{t-T_{i}}{\\epsilon}\\right]} - - An exponential dip event is normalized such that the EXPDIPAMP_ is its extremum - value. Note that the extremum occurs slightly after the event epoch. - - Parameters supported: - - .. paramtable:: - :class: pint.models.expdip.SimpleExponentialDip - """ - - register = True - category = "simple_exp_dip" - - def __init__(self): - super().__init__() - - self.add_param( - floatParameter( - name=f"EXPDIPEPS", - units="day", - description="Chromatic exponential dip step timescale", - value=1e-3, - frozen=True, - tcb2tdb_scale_factor=1, - ) - ) - - self.add_param( - floatParameter( - name=f"EXPDIPFREF", - units="MHz", - description="Chromatic exponential dip reference frequency", - value=1400, - frozen=True, - tcb2tdb_scale_factor=1, - ) - ) - - self.add_exp_dip(None, 0, None, None, index=1, frozen=True) - - self.delay_funcs_component += [self.expdip_delay] - - def add_exp_dip( - self, - epoch: Union[float, u.Quantity, Time], - ampl: Union[float, u.Quantity], - gamma: Union[float, u.Quantity], - tau: Union[float, u.Quantity], - index: Optional[int] = None, - frozen: bool = True, - ) -> int: - """Add an exponential dip to the model.""" - - if index is None: - dct = self.get_prefix_mapping_component("EXPEP_") - index = np.max(list(dct.keys())) + 1 - elif int(index) in self.get_prefix_mapping_component("EXPEP_"): - raise ValueError( - f"Index '{index}' is already in use in this model. Please choose another." - ) - - if isinstance(epoch, Time): - epoch = epoch.mjd - elif isinstance(epoch, u.Quantity): - epoch = epoch.value - - self.add_param( - prefixParameter( - name=f"EXPDIPEP_{index}", - units="MJD", - description="Chromatic exponential dip epoch", - parameter_type="MJD", - time_scale="utc", - value=epoch, - frozen=frozen, - tcb2tdb_scale_factor=1, - prefix_aliases=["EXPEP_"], - ) - ) - - if isinstance(ampl, u.Quantity): - ampl = ampl.to_value(u.s) - - self.add_param( - prefixParameter( - name=f"EXPDIPAMP_{index}", - units="s", - value=ampl, - description="Chromatic exponential dip amplitude", - parameter_type="float", - frozen=frozen, - tcb2tdb_scale_factor=1, - prefix_aliases=["EXPPH_"], - ) - ) - - if isinstance(gamma, u.Quantity): - gamma = gamma.to_value(u.s) - - self.add_param( - prefixParameter( - name=f"EXPDIPIDX_{index}", - units="", - value=gamma, - description="Chromatic exponential dip index", - parameter_type="float", - frozen=frozen, - tcb2tdb_scale_factor=1, - prefix_aliases=["EXPINDEX_"], - ) - ) - - if isinstance(tau, u.Quantity): - tau = tau.to_value(u.s) - - self.add_param( - prefixParameter( - name=f"EXPDIPTAU_{index}", - units="day", - value=tau, - description="Chromatic exponential dip decay timescale", - parameter_type="float", - frozen=frozen, - tcb2tdb_scale_factor=1, - prefix_aliases=["EXPTAU_"], - ) - ) - - self.setup() - self.validate() - - return index - - def remove_exp_dip(self, index: Union[float, int, List[int], np.ndarray]) -> None: - """Removes all exp dip parameters associated with a given index/list of indices. - - Parameters - ---------- - - index : float, int, list, np.ndarray - Number or list/array of numbers corresponding to DMX indices to be removed from model. - """ - - if isinstance(index, (int, float, np.int64)): - indices = [index] - elif isinstance(index, (list, set, np.ndarray)): - indices = index - else: - raise TypeError( - f"index must be a float, int, set, list, or array - not {type(index)}" - ) - for index in indices: - index_rf = f"{int(index):d}" - for prefix in ["EXPDIPEP_", "EXPDIPAMP_", "EXPDIPTAU_", "EXPDIPIDX_"]: - self.remove_param(f"{prefix}{index_rf}") - self.validate() - - def get_indices(self) -> np.ndarray: - """Returns an array of integers corresponding to exp dip parameters. - - Returns - ------- - inds : np.ndarray - Array of exp dip indices in model. - """ - inds = [int(p.split("_")[-1]) for p in self.params if "EXPDIPEP_" in p] - return np.array(inds) - - def setup(self) -> None: - super().setup() - # Get DMX mapping. - # Register the DMX derivatives - for prefix_par in self.get_params_of_type("prefixParameter"): - if prefix_par.startswith("EXPDIPEP_"): - self.register_deriv_funcs(self.d_delay_d_T, prefix_par) - elif prefix_par.startswith("EXPDIPAMP_"): - self.register_deriv_funcs(self.d_delay_d_A, prefix_par) - elif prefix_par.startswith("EXPDIPTAU_"): - self.register_deriv_funcs(self.d_delay_d_tau, prefix_par) - elif prefix_par.startswith("EXPDIPIDX_"): - self.register_deriv_funcs(self.d_delay_d_gamma, prefix_par) - - def get_ffac(self, toas: TOAs) -> np.ndarray: - """Compute f/fref where f is the observing frequency.""" - f = self._parent.barycentric_radio_freq(toas) - fref = self.EXPDIPFREF.quantity - return (f / fref).to_value(u.dimensionless_unscaled) - - def expdip_delay_term( - self, t_mjd: np.ndarray, ffac: np.ndarray, ii: int - ) -> u.Quantity: - """Compute the delay for a single exponential dip event.""" - T = getattr(self, f"EXPDIPEP_{ii}").value - dt = (t_mjd - T) * u.day - - A = getattr(self, f"EXPDIPAMP_{ii}").quantity - gamma = getattr(self, f"EXPDIPIDX_{ii}").quantity - tau = getattr(self, f"EXPDIPTAU_{ii}").quantity - eps = self.EXPDIPEPS.quantity - - # Done this way to avoid overflow in exp - expfac = np.zeros(len(dt)) - expfac[dt >= 0] = np.exp(-dt[dt >= 0] / tau) / (1 + np.exp(-dt[dt >= 0] / eps)) - expfac[dt < 0] = np.exp(dt[dt < 0] * (tau - eps) / (tau * eps)) / ( - 1 + np.exp(dt[dt < 0] / eps) - ) - - return ( - -A - * ffac**gamma - * (tau / eps) ** (eps / tau) - * (tau / (tau - eps)) ** ((tau - eps) / tau) - * expfac - ) - - def expdip_delay(self, toas: TOAs, acc_delay=None) -> u.Quantity: - """Total exponential dip delay.""" - indices = self.get_indices() - - ffac = self.get_ffac(toas) - t_mjd = toas["tdbld"] - - delay = np.zeros(len(toas)) * u.s - for ii in indices: - delay += self.expdip_delay_term(t_mjd, ffac, ii) - - return delay - - def d_delay_d_A(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: - """Derivative of delay w.r.t. exponential dip amplitude.""" - ii = getattr(self, param).index - ffac = self.get_ffac(toas) - A = getattr(self, f"EXPDIPAMP_{ii}").quantity - return self.expdip_delay_term(toas["tdbld"], ffac, ii) / A - - def d_delay_d_gamma(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: - """Derivative of delay w.r.t. exponential dip chromatic index.""" - ii = getattr(self, param).index - ffac = self.get_ffac(toas) - return self.expdip_delay_term(toas["tdbld"], ffac, ii) * np.log(ffac) - - def d_delay_d_tau(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: - """Derivative of delay w.r.t. exponential dip decay timescale.""" - ii = getattr(self, param).index - ffac = self.get_ffac(toas) - - t0_mjd = getattr(self, f"EXPDIPEP_{ii}").value - dt = (toas["tdbld"] - t0_mjd) * u.day - - tau = getattr(self, f"EXPDIPTAU_{ii}").quantity - eps = self.EXPDIPEPS.quantity - - return ( - self.expdip_delay_term(toas["tdbld"], ffac, ii) - * (dt + eps * np.log(eps / (tau - eps))) - / tau**2 - ) - - def d_delay_d_T(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: - """Derivative of delay w.r.t. exponential dip epoch.""" - ii = getattr(self, param).index - ffac = self.get_ffac(toas) - - T = getattr(self, f"EXPDIPEP_{ii}").value - dt = (toas["tdbld"] - T) * u.day - - tau = getattr(self, f"EXPDIPTAU_{ii}").quantity - eps = self.EXPDIPEPS.quantity - - # Done this way to avoid overflow in exp - expfac1 = np.zeros(len(dt)) - expfac1[dt >= 0] = np.exp(-dt[dt >= 0] / eps) / (1 + np.exp(-dt[dt >= 0] / eps)) - expfac1[dt < 0] = 1 / (1 + np.exp(dt[dt < 0] / eps)) - - return self.expdip_delay_term(toas["tdbld"], ffac, ii) * ( - (1 / tau) - (1 / eps) * expfac1 - ) diff --git a/src/pint/models/transient_events.py b/src/pint/models/transient_events.py new file mode 100644 index 0000000000..d6e7ac912a --- /dev/null +++ b/src/pint/models/transient_events.py @@ -0,0 +1,656 @@ +"""Transient chromatic events due to, e.g., profile change events in J1713+0747 or extreme scattering events (ESE)""" + +from typing import List, Optional, Union +import numpy as np +import astropy.units as u +from astropy.time import Time +from pint.models.parameter import floatParameter, prefixParameter +from pint.models.timing_model import DelayComponent +from pint.toa import TOAs + + +class SimpleExponentialDip(DelayComponent): + """Simple chromatic exponential dip model for events like the profile changes + in J1713+0747. + + The dip is modeled as a logistic function multiplied by an exponential decay. + This is different from the tempo2 implementation where the exponential decay + is multiplied by a Heaviside step function. We cannot fit for the event epoch + while using the latter because it is not differentiable at the event epoch. + This implementation approaches the tempo2 version in the limit EXPDIPEPS -> 0. + The parameter names are also different between this implementation and tempo2. + The tempo2 parameter names have been added here as aliases so that tempo2 par files + can be read. + + The explicit mathematical form of the dips is as follows. + + .. math:: + \\Delta_{\\text{dip}}(t)=-\\sum_{i}A_{i}\\left(\\frac{f}{f_{\\text{ref}}}\\right)^{\\gamma_{i}}\\left(\\frac{\\tau_{i}}{\\epsilon}\\right)^{\\epsilon/\\tau_{i}}\\left(\\frac{\\tau_{i}}{\\tau_{i}-\\epsilon}\\right)^{\\frac{\\tau_{i}-\\epsilon}{\\tau_{i}}}\\frac{\\exp\\left[-\\frac{t-T_{i}}{\\tau_{i}}\\right]}{1+\\exp\\left[-\\frac{t-T_{i}}{\\epsilon}\\right]} + + An exponential dip event is normalized such that the EXPDIPAMP_ is its extremum + value. Note that the extremum occurs slightly after the event epoch. + + Parameters supported: + + .. paramtable:: + :class: pint.models.transient_events.SimpleExponentialDip + """ + + register = True + category = "simple_exp_dip" + + def __init__(self): + super().__init__() + + self.add_param( + floatParameter( + name=f"EXPDIPEPS", + units="day", + description="Chromatic exponential dip step timescale", + value=1e-3, + frozen=True, + tcb2tdb_scale_factor=1, + ) + ) + + self.add_param( + floatParameter( + name=f"EXPDIPFREF", + units="MHz", + description="Chromatic exponential dip reference frequency", + value=1400, + frozen=True, + tcb2tdb_scale_factor=1, + ) + ) + + self.add_exp_dip(None, 0, None, None, index=1, frozen=True) + + self.delay_funcs_component += [self.expdip_delay] + + def add_exp_dip( + self, + epoch: Union[float, u.Quantity, Time], + ampl: Union[float, u.Quantity], + gamma: Union[float, u.Quantity], + tau: Union[float, u.Quantity], + index: Optional[int] = None, + frozen: bool = True, + ) -> int: + """Add an exponential dip to the model.""" + + if index is None: + dct = self.get_prefix_mapping_component("EXPEP_") + index = np.max(list(dct.keys())) + 1 + elif int(index) in self.get_prefix_mapping_component("EXPEP_"): + raise ValueError( + f"Index '{index}' is already in use in this model. Please choose another." + ) + + if isinstance(epoch, Time): + epoch = epoch.mjd + elif isinstance(epoch, u.Quantity): + epoch = epoch.value + + self.add_param( + prefixParameter( + name=f"EXPDIPEP_{index}", + units="MJD", + description="Chromatic exponential dip epoch", + parameter_type="MJD", + time_scale="utc", + value=epoch, + frozen=frozen, + tcb2tdb_scale_factor=1, + prefix_aliases=["EXPEP_"], + ) + ) + + if isinstance(ampl, u.Quantity): + ampl = ampl.to_value(u.s) + + self.add_param( + prefixParameter( + name=f"EXPDIPAMP_{index}", + units="s", + value=ampl, + description="Chromatic exponential dip amplitude", + parameter_type="float", + frozen=frozen, + tcb2tdb_scale_factor=1, + prefix_aliases=["EXPPH_"], + ) + ) + + if isinstance(gamma, u.Quantity): + gamma = gamma.to_value(u.s) + + self.add_param( + prefixParameter( + name=f"EXPDIPIDX_{index}", + units="", + value=gamma, + description="Chromatic exponential dip index", + parameter_type="float", + frozen=frozen, + tcb2tdb_scale_factor=1, + prefix_aliases=["EXPINDEX_"], + ) + ) + + if isinstance(tau, u.Quantity): + tau = tau.to_value(u.s) + + self.add_param( + prefixParameter( + name=f"EXPDIPTAU_{index}", + units="day", + value=tau, + description="Chromatic exponential dip decay timescale", + parameter_type="float", + frozen=frozen, + tcb2tdb_scale_factor=1, + prefix_aliases=["EXPTAU_"], + ) + ) + + self.setup() + self.validate() + + return index + + def remove_exp_dip(self, index: Union[float, int, List[int], np.ndarray]) -> None: + """Removes all exp dip parameters associated with a given index/list of indices. + + Parameters + ---------- + + index : float, int, list, np.ndarray + Number or list/array of numbers corresponding to DMX indices to be removed from model. + """ + + if isinstance(index, (int, float, np.int64)): + indices = [index] + elif isinstance(index, (list, set, np.ndarray)): + indices = index + else: + raise TypeError( + f"index must be a float, int, set, list, or array - not {type(index)}" + ) + for index in indices: + index_rf = f"{int(index):d}" + for prefix in ["EXPDIPEP_", "EXPDIPAMP_", "EXPDIPTAU_", "EXPDIPIDX_"]: + self.remove_param(f"{prefix}{index_rf}") + self.validate() + + def get_indices(self) -> np.ndarray: + """Returns an array of integers corresponding to exp dip parameters. + + Returns + ------- + inds : np.ndarray + Array of exp dip indices in model. + """ + inds = [int(p.split("_")[-1]) for p in self.params if "EXPDIPEP_" in p] + return np.array(inds) + + def setup(self) -> None: + super().setup() + # Get DMX mapping. + # Register the DMX derivatives + for prefix_par in self.get_params_of_type("prefixParameter"): + if prefix_par.startswith("EXPDIPEP_"): + self.register_deriv_funcs(self.d_delay_d_T, prefix_par) + elif prefix_par.startswith("EXPDIPAMP_"): + self.register_deriv_funcs(self.d_delay_d_A, prefix_par) + elif prefix_par.startswith("EXPDIPTAU_"): + self.register_deriv_funcs(self.d_delay_d_tau, prefix_par) + elif prefix_par.startswith("EXPDIPIDX_"): + self.register_deriv_funcs(self.d_delay_d_gamma, prefix_par) + + def get_ffac(self, toas: TOAs) -> np.ndarray: + """Compute f/fref where f is the observing frequency.""" + f = self._parent.barycentric_radio_freq(toas) + fref = self.EXPDIPFREF.quantity + return (f / fref).to_value(u.dimensionless_unscaled) + + def expdip_delay_term( + self, t_mjd: np.ndarray, ffac: np.ndarray, ii: int + ) -> u.Quantity: + """Compute the delay for a single exponential dip event.""" + T = getattr(self, f"EXPDIPEP_{ii}").value + dt = (t_mjd - T) * u.day + + A = getattr(self, f"EXPDIPAMP_{ii}").quantity + gamma = getattr(self, f"EXPDIPIDX_{ii}").quantity + tau = getattr(self, f"EXPDIPTAU_{ii}").quantity + eps = self.EXPDIPEPS.quantity + + # Done this way to avoid overflow in exp + expfac = np.zeros(len(dt)) + expfac[dt >= 0] = np.exp(-dt[dt >= 0] / tau) / (1 + np.exp(-dt[dt >= 0] / eps)) + expfac[dt < 0] = np.exp(dt[dt < 0] * (tau - eps) / (tau * eps)) / ( + 1 + np.exp(dt[dt < 0] / eps) + ) + + return ( + -A + * ffac**gamma + * (tau / eps) ** (eps / tau) + * (tau / (tau - eps)) ** ((tau - eps) / tau) + * expfac + ) + + def expdip_delay(self, toas: TOAs, acc_delay=None) -> u.Quantity: + """Total exponential dip delay.""" + indices = self.get_indices() + + ffac = self.get_ffac(toas) + t_mjd = toas["tdbld"] + + delay = np.zeros(len(toas)) * u.s + for ii in indices: + delay += self.expdip_delay_term(t_mjd, ffac, ii) + + return delay + + def d_delay_d_A(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: + """Derivative of delay w.r.t. exponential dip amplitude.""" + ii = getattr(self, param).index + ffac = self.get_ffac(toas) + A = getattr(self, f"EXPDIPAMP_{ii}").quantity + return self.expdip_delay_term(toas["tdbld"], ffac, ii) / A + + def d_delay_d_gamma(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: + """Derivative of delay w.r.t. exponential dip chromatic index.""" + ii = getattr(self, param).index + ffac = self.get_ffac(toas) + return self.expdip_delay_term(toas["tdbld"], ffac, ii) * np.log(ffac) + + def d_delay_d_tau(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: + """Derivative of delay w.r.t. exponential dip decay timescale.""" + ii = getattr(self, param).index + ffac = self.get_ffac(toas) + + t0_mjd = getattr(self, f"EXPDIPEP_{ii}").value + dt = (toas["tdbld"] - t0_mjd) * u.day + + tau = getattr(self, f"EXPDIPTAU_{ii}").quantity + eps = self.EXPDIPEPS.quantity + + return ( + self.expdip_delay_term(toas["tdbld"], ffac, ii) + * (dt + eps * np.log(eps / (tau - eps))) + / tau**2 + ) + + def d_delay_d_T(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: + """Derivative of delay w.r.t. exponential dip epoch.""" + ii = getattr(self, param).index + ffac = self.get_ffac(toas) + + T = getattr(self, f"EXPDIPEP_{ii}").value + dt = (toas["tdbld"] - T) * u.day + + tau = getattr(self, f"EXPDIPTAU_{ii}").quantity + eps = self.EXPDIPEPS.quantity + + # Done this way to avoid overflow in exp + expfac1 = np.zeros(len(dt)) + expfac1[dt >= 0] = np.exp(-dt[dt >= 0] / eps) / (1 + np.exp(-dt[dt >= 0] / eps)) + expfac1[dt < 0] = 1 / (1 + np.exp(dt[dt < 0] / eps)) + + return self.expdip_delay_term(toas["tdbld"], ffac, ii) * ( + (1 / tau) - (1 / eps) * expfac1 + ) + + +class ChromaticGaussianEvent(DelayComponent): + r"""Simple chromatic Gaussian model for extreme scattering events or other chromatic transient features + + This phenomenological model is defined as a Gaussian in time multiplied by a power law in radio frequency with variable chromaticity. + The model parameters include the event epoch, sign, log10 amplitude, chromatic index, and log10 standard deviation of the Gaussian. + See Coles et al. 2015 (https://arxiv.org/abs/1506.07948) for more details at extreme scattering events. + See Reardon et al. 2023 (https://arxiv.org/abs/2306.16229) for an application of this model in PTAs, + but note a typo in Reardon et al., which lacks a negative sign in the exponential. + The explicit mathematical form of the Gaussian event is as follows. + + .. math:: + + \Delta_{\text{Gaussian}}(t) = \sum_{i} \text{SIGN}_{i} \cdot 10^{\text{LOGAMP}_{i}} + \left(\frac{f}{f_{\text{ref}}}\right)^{-\text{CHROMIDX}_{i}} + \exp\left[-\frac{(t-\text{EPOCH}_{i})^2}{2\left(10^{\text{LOGSIG}_{i}}\right)^2}\right] + + where :math:`\text{LOGAMP}_{i} = \log_{10}(A_i / \text{s})`, + :math:`\text{LOGSIG}_{i} = \log_{10}(\sigma_i / \text{days})`, + :math:`\text{EPOCH}_{i}` is in MJD, :math:`f_{\text{ref}} = \text{CHROMGAUSS_FREF}` in MHz, + and :math:`\text{SIGN}_{i} \in \{-1, +1\}`. + + + Parameters supported: + + .. paramtable:: + :class: pint.models.transient_events.ChromaticGaussianEvent + + Examples + -------- + Add a chromatic Gaussian event to an existing PINT timing model: + + >>> import numpy as np + >>> from pint.models.transient_events import ChromaticGaussianEvent + >>> chrom_gauss = ChromaticGaussianEvent() + >>> model.add_component(chrom_gauss) + >>> chrom_comp = model.components['ChromaticGaussianEvent'] + >>> chrom_comp.add_chromatic_gaussian_event( + ... 55000, # EPOCH (MJD) + ... np.log10(5e-6), # LOGAMP log10(s) + ... 4, # CHROMIDX: chromatic index (f/fref)^-chromidx + ... np.log10(300), # LOGSIGMA log10(days) + ... 1, # SIGN: event sign + ... index=1, + ... force=True, + ... ) + + Notes + ----- + The above example will appear in the par file as such:: + + CHROMGAUSS_FREF 1400 + CHROMGAUSS_EPOCH_1 55000.0 1 + CHROMGAUSS_LOGAMP_1 -5.301 1 + CHROMGAUSS_CHROMIDX_1 4.0 0 + CHROMGAUSS_LOGSIG_1 2.477 1 + CHROMGAUSS_SIGN_1 1 0 + + For an additional event replace the ``_1`` suffix with ``_2``. + The derivatives w.r.t SIGN and EPOCH can be quite large so it is recommended to fix SIGN and start from a good estimate of EPOCH. + """ + + register = True + category = "chromatic_gaussian_event" + + def __init__(self): + super().__init__() + + self.add_param( + floatParameter( + name=f"CHROMGAUSS_FREF", + units="MHz", + description="Chromatic Gaussian event reference frequency", + value=1400, + frozen=True, + tcb2tdb_scale_factor=1, + ) + ) + + # Dummy event so the model builder can discover this component + self.add_chromatic_gaussian_event(None, 0, 0, 0, 1, index=1, frozen=True) + + # Register delay function once (it checks for events internally) + self.delay_funcs_component += [self.chrom_gauss_delay] + + def add_chromatic_gaussian_event( + self, + epoch: Union[float, u.Quantity, Time], + log10amp: Union[float, u.Quantity], + chromidx: Union[float, u.Quantity], + log10sigma: Union[float, u.Quantity], + sign: Union[int, float, u.Quantity], + index: Optional[int] = 1, + frozen: bool = True, + force: bool = False, + ) -> int: + """ + Add a chromatic Gaussian event to the model. + Use force = True to overwrite existing event at given index. + """ + + if index is None: + dct = self.get_prefix_mapping_component("CHROMGAUSS_EPOCH_") + if dct: + index = np.max(list(dct.keys())) + 1 + else: + index = 1 # Start at 1 if no events exist yet + elif int(index) in self.get_prefix_mapping_component("CHROMGAUSS_EPOCH_"): + if not force: + raise ValueError( + f"Index '{index}' is already in use in this model. Please choose another." + ) + else: + # Remove existing event so we can re-add with new values + self.remove_chromatic_gaussian_event(int(index)) + + if isinstance(epoch, Time): + epoch = epoch.mjd + elif isinstance(epoch, u.Quantity): + epoch = epoch.value + + self.add_param( + prefixParameter( + name=f"CHROMGAUSS_EPOCH_{index}", + units="MJD", + description="Chromatic Gaussian event epoch", + parameter_type="MJD", + time_scale="utc", + value=epoch, + frozen=frozen, + tcb2tdb_scale_factor=1, + ) + ) + + self.add_param( + prefixParameter( + name=f"CHROMGAUSS_LOGAMP_{index}", + units="", + value=log10amp, + description="Log10 Chromatic Gaussian event amplitude (log10 in seconds)", + parameter_type="float", + frozen=frozen, + tcb2tdb_scale_factor=1, + ) + ) + + self.add_param( + prefixParameter( + name=f"CHROMGAUSS_CHROMIDX_{index}", + units="", + value=chromidx, + description="Chromatic Gaussian event chromatic index", + parameter_type="float", + frozen=frozen, + tcb2tdb_scale_factor=1, + ) + ) + + if isinstance(sign, u.Quantity): + sign = sign.value + sign = np.sign(sign) # get the actual sign + + self.add_param( + prefixParameter( + name=f"CHROMGAUSS_SIGN_{index}", + units="", + value=sign, + description="Chromatic Gaussian event sign", + parameter_type="float", + frozen=True, + tcb2tdb_scale_factor=1, + ) + ) + + self.add_param( + prefixParameter( + name=f"CHROMGAUSS_LOGSIG_{index}", + units="", + value=log10sigma, + description="Log10 Chromatic Gaussian event standard deviation (log10 in days)", + parameter_type="float", + frozen=frozen, + tcb2tdb_scale_factor=1, + ) + ) + + self.setup() + self.validate() + + return index + + def remove_chromatic_gaussian_event( + self, index: Union[float, int, List[int], np.ndarray] + ) -> None: + """Removes all chromatic Gaussian event parameters associated with a given index/list of indices. + + Parameters + ---------- + + index : float, int, list, np.ndarray + Number or list/array of numbers corresponding to chromatic Gaussian event indices to be removed from model. + """ + + if isinstance(index, (int, float, np.int64)): + indices = [index] + elif isinstance(index, (list, set, np.ndarray)): + indices = index + else: + raise TypeError( + f"index must be a float, int, set, list, or array - not {type(index)}" + ) + for index in indices: + index_rf = f"{int(index):d}" + for prefix in [ + "CHROMGAUSS_EPOCH_", + "CHROMGAUSS_LOGAMP_", + "CHROMGAUSS_LOGSIG_", + "CHROMGAUSS_CHROMIDX_", + "CHROMGAUSS_SIGN_", + ]: + param_name = f"{prefix}{index_rf}" + if param_name in self.params: + self.remove_param(param_name) + + self.setup() + self.validate() + # Also re-setup parent model if attached, so model-level derivative + # registry is cleaned up properly. + if hasattr(self, "_parent") and self._parent is not None: + self._parent.setup() + + def get_indices(self) -> np.ndarray: + """Returns an array of integers corresponding to chromatic Gaussian event parameters. + + Returns + ------- + inds : np.ndarray + Array of chromatic Gaussian event indices in model. + """ + inds = [int(p.split("_")[-1]) for p in self.params if "CHROMGAUSS_EPOCH_" in p] + return np.array(inds) + + def setup(self) -> None: + super().setup() + # Get mapping. + # Register the ChromGauss derivatives + for prefix_par in self.get_params_of_type("prefixParameter"): + if prefix_par.startswith("CHROMGAUSS_EPOCH_"): + self.register_deriv_funcs(self.d_delay_d_epoch, prefix_par) + elif prefix_par.startswith("CHROMGAUSS_LOGAMP_"): + self.register_deriv_funcs(self.d_delay_d_log10amp, prefix_par) + elif prefix_par.startswith("CHROMGAUSS_LOGSIG_"): + self.register_deriv_funcs(self.d_delay_d_log10sigma, prefix_par) + elif prefix_par.startswith("CHROMGAUSS_CHROMIDX_"): + self.register_deriv_funcs(self.d_delay_d_chromidx, prefix_par) + elif prefix_par.startswith("CHROMGAUSS_SIGN_"): + self.register_deriv_funcs(self.d_delay_d_sign, prefix_par) + + def get_ffac(self, toas: TOAs) -> np.ndarray: + """Compute f/fref where f is the observing frequency.""" + f = self._parent.barycentric_radio_freq(toas) + fref = self.CHROMGAUSS_FREF.quantity + return (f / fref).to_value(u.dimensionless_unscaled) + + def chrom_gauss_delay_term( + self, t_mjd: np.ndarray, ffac: np.ndarray, ii: int + ) -> u.Quantity: + """Compute the delay for a single chromatic Gaussian event.""" + T = getattr(self, f"CHROMGAUSS_EPOCH_{ii}").value + dt = (t_mjd - T) * u.day + + log10Amp = getattr(self, f"CHROMGAUSS_LOGAMP_{ii}").value + chromidx = getattr(self, f"CHROMGAUSS_CHROMIDX_{ii}").value + log10sigma = getattr(self, f"CHROMGAUSS_LOGSIG_{ii}").value + sign = getattr(self, f"CHROMGAUSS_SIGN_{ii}").value + + sigma = 10 ** (log10sigma) * u.day + return ( + sign + * 10**log10Amp + * u.s + * np.exp(-0.5 * (dt.value) ** 2 / (sigma.value) ** 2) + * ffac ** (-chromidx) + ) + + def chrom_gauss_delay(self, toas: TOAs, acc_delay=None) -> u.Quantity: + """Total delay across all chromatic Gaussian events.""" + indices = self.get_indices() + + ffac = self.get_ffac(toas) + t_mjd = toas["tdbld"] + + delay = np.zeros(len(toas)) * u.s + for ii in indices: + delay += self.chrom_gauss_delay_term(t_mjd, ffac, ii) + + return delay + + def d_delay_d_log10amp(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: + """Derivative of delay w.r.t. chromatic Gaussian amplitude.""" + ii = getattr(self, param).index + ffac = self.get_ffac(toas) + return self.chrom_gauss_delay_term(toas["tdbld"], ffac, ii) * np.log(10) + + def d_delay_d_chromidx(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: + """Derivative of delay w.r.t. chromatic Gaussian index.""" + ii = getattr(self, param).index + ffac = self.get_ffac(toas) + return self.chrom_gauss_delay_term(toas["tdbld"], ffac, ii) * np.log(ffac**-1) + + def d_delay_d_log10sigma( + self, toas: TOAs, param: str, acc_delay=None + ) -> u.Quantity: + """Derivative of delay w.r.t. chromatic Gaussian st. deviation.""" + ii = getattr(self, param).index + ffac = self.get_ffac(toas) + + t0_mjd = getattr(self, f"CHROMGAUSS_EPOCH_{ii}").value + dt = (toas["tdbld"] - t0_mjd) * u.day + + log10sigma = getattr(self, f"CHROMGAUSS_LOGSIG_{ii}").value + # sigma = 10 ** (log10sigma) * u.day + + return ( + self.chrom_gauss_delay_term(toas["tdbld"], ffac, ii) + * (dt.value) ** 2 + / (10 ** (2 * log10sigma)) + * np.log(10) + ) + + def d_delay_d_epoch(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: + """Derivative of delay w.r.t. chromatic Gaussian epoch.""" + ii = getattr(self, param).index + ffac = self.get_ffac(toas) + + T = getattr(self, f"CHROMGAUSS_EPOCH_{ii}").value + dt = (toas["tdbld"] - T) * u.day + + log10sigma = getattr(self, f"CHROMGAUSS_LOGSIG_{ii}").value + sigma_sq = 10 ** (2 * log10sigma) * u.day**2 + + return self.chrom_gauss_delay_term(toas["tdbld"], ffac, ii) * (dt / sigma_sq) + + def d_delay_d_sign(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity: + """Derivative of delay w.r.t. chromatic Gaussian sign.""" + ii = getattr(self, param).index + ffac = self.get_ffac(toas) + # Derivative w.r.t. sign is just the delay term divided by sign + sign = getattr(self, f"CHROMGAUSS_SIGN_{ii}").value + return self.chrom_gauss_delay_term(toas["tdbld"], ffac, ii) / sign diff --git a/src/pint/utils.py b/src/pint/utils.py index d9e7e294ff..e2bb92cfc8 100644 --- a/src/pint/utils.py +++ b/src/pint/utils.py @@ -359,6 +359,9 @@ def has_astropy_unit(x: Any) -> bool: re.compile(r"^([a-zA-Z]+)0*(\d+)$"), # For the prefix like F12 re.compile(r"^([a-zA-Z0-9]+_)(\d+)$"), # For the prefix like DMXR1_3 re.compile(r"([a-zA-Z]+_[a-zA-Z]+)(\d+)$"), # for prefixes like NE_SW2? + re.compile( + r"^([a-zA-Z]+_[a-zA-Z]+_)(\d+)$" + ), # for prefixes like CHROMGAUSS_EPOCH_1 ] diff --git a/tests/test_expdip.py b/tests/test_expdip.py deleted file mode 100644 index 34264b5841..0000000000 --- a/tests/test_expdip.py +++ /dev/null @@ -1,67 +0,0 @@ -from io import StringIO -import numpy as np -import pytest -import astropy.units as u - -from pint.fitter import Fitter -from pint.models.model_builder import get_model -from pint.simulation import make_fake_toas_uniform -from pint.residuals import Residuals - - -@pytest.fixture -def model_and_toas(): - par = """ - RAJ 05:00:00 1 - DECJ 15:00:00 1 - POSEPOCH 55000 - F0 100 1 - F1 -1e-15 1 - PEPOCH 55000 - EXPDIPEP_1 54764.272428904194001 1 - EXPDIPAMP_1 1.6641670367524487e-06 1 - EXPDIPTAU_1 112.00425959054773 1 - EXPDIPIDX_1 -1.9148109887274356 1 - EXPEP_2 55764.272428904194001 1 - EXPPH_2 2.6641670367524487e-06 1 - EXPTAU_2 102.00425959054773 1 - EXPINDEX_2 -2.9148109887274356 1 - EXPDIPEPS 0.01 - TZRMJD 55000 - TZRSITE pks - TZRFRQ 1400 - EPHEM DE440 - CLOCK TT(BIPM2019) - UNITS TDB - """ - m = get_model(StringIO(par)) - - freqs = np.linspace(500, 1500, 8) * u.MHz - t = make_fake_toas_uniform( - 54000, - 58000, - 4000, - m, - freq=freqs, - obs="pks", - add_noise=True, - multi_freqs_in_epoch=True, - ) - - return m, t - - -def test_expdip(model_and_toas): - m, t = model_and_toas - - assert len(m.components["SimpleExponentialDip"].get_indices()) == 2 - - res = Residuals(t, m) - assert res.reduced_chi2 < 1.5 - - ftr = Fitter.auto(t, m) - ftr.fit_toas(maxiter=10) - assert ftr.resids.reduced_chi2 < 1.5 - - for p in m.free_params: - assert (m[p].value - ftr.model[p].value) / ftr.model[p].uncertainty_value < 3 diff --git a/tests/test_transient_events.py b/tests/test_transient_events.py new file mode 100644 index 0000000000..578f654355 --- /dev/null +++ b/tests/test_transient_events.py @@ -0,0 +1,443 @@ +from io import StringIO +import numpy as np +import pytest +import astropy.units as u + +from pint.fitter import Fitter +from pint.models.model_builder import get_model +from pint.models.transient_events import ChromaticGaussianEvent +from pint.simulation import make_fake_toas_uniform +from pint.residuals import Residuals + + +@pytest.fixture +def model_and_toas(): + par = """ + RAJ 05:00:00 1 + DECJ 15:00:00 1 + POSEPOCH 55000 + F0 100 1 + F1 -1e-15 1 + PEPOCH 55000 + EXPDIPEP_1 54764.272428904194001 1 + EXPDIPAMP_1 1.6641670367524487e-06 1 + EXPDIPTAU_1 112.00425959054773 1 + EXPDIPIDX_1 -1.9148109887274356 1 + EXPEP_2 55764.272428904194001 1 + EXPPH_2 2.6641670367524487e-06 1 + EXPTAU_2 102.00425959054773 1 + EXPINDEX_2 -2.9148109887274356 1 + EXPDIPEPS 0.01 + TZRMJD 55000 + TZRSITE pks + TZRFRQ 1400 + EPHEM DE440 + CLOCK TT(BIPM2019) + UNITS TDB + """ + m = get_model(StringIO(par)) + + freqs = np.linspace(500, 1500, 8) * u.MHz + t = make_fake_toas_uniform( + 54000, + 58000, + 4000, + m, + freq=freqs, + obs="pks", + add_noise=True, + multi_freqs_in_epoch=True, + ) + + return m, t + + +def test_expdip(model_and_toas): + m, t = model_and_toas + + assert len(m.components["SimpleExponentialDip"].get_indices()) == 2 + + res = Residuals(t, m) + assert res.reduced_chi2 < 1.5 + + ftr = Fitter.auto(t, m) + ftr.fit_toas(maxiter=10) + assert ftr.resids.reduced_chi2 < 1.5 + + for p in m.free_params: + assert (m[p].value - ftr.model[p].value) / ftr.model[p].uncertainty_value < 3 + + +# ============================================================ +# ChromaticGaussianEvent tests +# ============================================================ + +BASE_PAR = """ + RAJ 05:00:00 1 + DECJ 15:00:00 1 + POSEPOCH 55000 + F0 100 1 + F1 -1e-15 1 + PEPOCH 55000 + TZRMJD 55000 + TZRSITE @ + TZRFRQ 1400 + EPHEM DE440 + CLOCK TT(BIPM2019) + UNITS TDB +""" + + +@pytest.fixture +def base_model(): + """A minimal timing model without any chromatic Gaussian events.""" + return get_model(StringIO(BASE_PAR)) + + +@pytest.fixture +def base_toas(base_model): + """Fake TOAs spanning the range where events will be injected.""" + freqs = np.linspace(500, 1500, 8) * u.MHz + return make_fake_toas_uniform( + 54000, + 58000, + 4000, + base_model, + freq=freqs, + obs="@", + add_noise=True, + multi_freqs_in_epoch=True, + ) + + +def _add_chromgauss_component(model): + """Helper to add a ChromaticGaussianEvent component to a model.""" + comp = ChromaticGaussianEvent() + model.add_component(comp) + comp.remove_chromatic_gaussian_event(1) + return model + + +def test_chromgauss_add_one(base_model, base_toas): + """Add a single chromatic Gaussian event and verify parameters and delay.""" + m = _add_chromgauss_component(base_model) + comp = m.components["ChromaticGaussianEvent"] + + idx = comp.add_chromatic_gaussian_event( + epoch=55500.0, + log10amp=-6.0, + chromidx=2.0, + log10sigma=1.5, + sign=1, + frozen=True, + ) + + assert idx == 1 + assert len(comp.get_indices()) == 1 + assert 1 in comp.get_indices() + + # All five prefix parameters should be present + assert hasattr(comp, "CHROMGAUSS_EPOCH_1") + assert hasattr(comp, "CHROMGAUSS_LOGAMP_1") + assert hasattr(comp, "CHROMGAUSS_CHROMIDX_1") + assert hasattr(comp, "CHROMGAUSS_LOGSIG_1") + assert hasattr(comp, "CHROMGAUSS_SIGN_1") + + # Delay computation should not raise + delay = comp.chrom_gauss_delay(base_toas) + assert delay.unit == u.s + assert len(delay) == len(base_toas) + + # Residuals should be finite + res = Residuals(base_toas, m) + assert np.all(np.isfinite(res.calc_time_resids().value)) + + +def test_chromgauss_add_two(base_model, base_toas): + """Add two chromatic Gaussian events and verify both are tracked.""" + m = _add_chromgauss_component(base_model) + comp = m.components["ChromaticGaussianEvent"] + + idx1 = comp.add_chromatic_gaussian_event( + epoch=55000.0, + log10amp=-6.0, + chromidx=2.0, + log10sigma=1.5, + sign=1, + ) + idx2 = comp.add_chromatic_gaussian_event( + epoch=56000.0, + log10amp=-5.5, + chromidx=2.0, + log10sigma=1.3, + sign=-1, + index=2, + ) + + assert idx1 == 1 + assert idx2 == 2 + assert len(comp.get_indices()) == 2 + assert set(comp.get_indices()) == {1, 2} + + # Delay should be the sum of both events + delay = comp.chrom_gauss_delay(base_toas) + assert delay.unit == u.s + + res = Residuals(base_toas, m) + assert np.all(np.isfinite(res.calc_time_resids().value)) + + +def test_chromgauss_remove(base_model, base_toas): + """Add two events, remove one, then remove the other.""" + m = _add_chromgauss_component(base_model) + comp = m.components["ChromaticGaussianEvent"] + + comp.add_chromatic_gaussian_event( + epoch=55000.0, log10amp=-6.0, chromidx=2.0, log10sigma=1.5, sign=1 + ) + comp.add_chromatic_gaussian_event( + epoch=56000.0, log10amp=-5.5, chromidx=2.0, log10sigma=1.3, sign=-1, index=2 + ) + assert len(comp.get_indices()) == 2 + + # Remove event 1 + comp.remove_chromatic_gaussian_event(1) + assert len(comp.get_indices()) == 1 + assert 2 in comp.get_indices() + assert 1 not in comp.get_indices() + assert not hasattr(comp, "CHROMGAUSS_EPOCH_1") + + # Residuals should still work with one event + res = Residuals(base_toas, m) + assert np.all(np.isfinite(res.calc_time_resids().value)) + + # Remove event 2 + comp.remove_chromatic_gaussian_event(2) + assert len(comp.get_indices()) == 0 + + # Delay should be zero with no events + delay = comp.chrom_gauss_delay(base_toas) + assert np.allclose(delay.value, 0.0) + + +def test_chromgauss_remove_multiple(base_model): + """Remove multiple events at once using a list of indices.""" + m = _add_chromgauss_component(base_model) + comp = m.components["ChromaticGaussianEvent"] + + for i in range(3): + comp.add_chromatic_gaussian_event( + epoch=55000.0 + i * 500, + log10amp=-6.0, + chromidx=2.0, + log10sigma=1.5, + sign=1, + index=i + 1, + ) + assert len(comp.get_indices()) == 3 + + comp.remove_chromatic_gaussian_event([1, 3]) + assert len(comp.get_indices()) == 1 + assert 2 in comp.get_indices() + + +def test_chromgauss_add_force(base_model): + """Adding an event with force=True should replace the existing one.""" + m = _add_chromgauss_component(base_model) + comp = m.components["ChromaticGaussianEvent"] + + comp.add_chromatic_gaussian_event( + epoch=55500.0, log10amp=-6.0, chromidx=2.0, log10sigma=1.5, sign=1, index=1 + ) + assert comp.CHROMGAUSS_LOGAMP_1.value == -6.0 + + # Force re-add with a different amplitude + comp.add_chromatic_gaussian_event( + epoch=55500.0, + log10amp=-7.0, + chromidx=2.0, + log10sigma=1.5, + sign=1, + index=1, + force=True, + ) + assert comp.CHROMGAUSS_LOGAMP_1.value == -7.0 + assert len(comp.get_indices()) == 1 + + +def test_chromgauss_add_duplicate_raises(base_model): + """Adding to an existing index without force should raise ValueError.""" + m = _add_chromgauss_component(base_model) + comp = m.components["ChromaticGaussianEvent"] + + comp.add_chromatic_gaussian_event( + epoch=55500.0, log10amp=-6.0, chromidx=2.0, log10sigma=1.5, sign=1, index=1 + ) + + with pytest.raises(ValueError, match="already in use"): + comp.add_chromatic_gaussian_event( + epoch=55500.0, + log10amp=-7.0, + chromidx=2.0, + log10sigma=1.5, + sign=1, + index=1, + ) + + +@pytest.fixture +def chromgauss_par_model_and_toas(): + """A model with one chromatic Gaussian event loaded from a par string, plus fake TOAs.""" + par = """ + RAJ 05:00:00 1 + DECJ 15:00:00 1 + POSEPOCH 55000 + F0 100 1 + F1 -1e-15 1 + PEPOCH 55000 + CHROMGAUSS_FREF 1400 + CHROMGAUSS_EPOCH_1 55500 1 + CHROMGAUSS_LOGAMP_1 -6.0 1 + CHROMGAUSS_CHROMIDX_1 2.0 1 + CHROMGAUSS_SIGN_1 1.0 0 + CHROMGAUSS_LOGSIG_1 1.5 1 + TZRMJD 55000 + TZRSITE @ + TZRFRQ 1400 + EPHEM DE440 + CLOCK TT(BIPM2019) + UNITS TDB + """ + m = get_model(StringIO(par)) + + np.random.seed(42) + freqs = np.linspace(500, 1500, 8) * u.MHz + t = make_fake_toas_uniform( + 54000, + 58000, + 4000, + m, + freq=freqs, + obs="@", + add_noise=True, + multi_freqs_in_epoch=True, + ) + + return m, t + + +def test_chromgauss_from_par(chromgauss_par_model_and_toas): + """Verify that a ChromaticGaussianEvent model can be read from a par string.""" + m, t = chromgauss_par_model_and_toas + + assert "ChromaticGaussianEvent" in m.components + comp = m.components["ChromaticGaussianEvent"] + assert len(comp.get_indices()) == 1 + assert 1 in comp.get_indices() + + assert comp.CHROMGAUSS_EPOCH_1.value == 55500.0 + assert comp.CHROMGAUSS_LOGAMP_1.value == -6.0 + assert comp.CHROMGAUSS_CHROMIDX_1.value == 2.0 + assert comp.CHROMGAUSS_SIGN_1.value == 1.0 + assert comp.CHROMGAUSS_LOGSIG_1.value == 1.5 + + res = Residuals(t, m) + assert np.all(np.isfinite(res.calc_time_resids().value)) + + +def test_chromgauss_fit(chromgauss_par_model_and_toas): + """Fit a model with a chromatic Gaussian event to simulated TOAs and check parameter recovery.""" + m, t = chromgauss_par_model_and_toas + + res = Residuals(t, m) + assert res.reduced_chi2 < 1.5 + + ftr = Fitter.auto(t, m) + ftr.fit_toas(maxiter=10) + assert ftr.resids.reduced_chi2 < 1.5 + + for p in m.free_params: + # Skip parameters with extremely tiny uncertainties + if ftr.model[p].uncertainty_value < 1e-15: + continue + assert abs(m[p].value - ftr.model[p].value) / ftr.model[p].uncertainty_value < 4 + + +@pytest.fixture +def chromgauss_par_two_events_model_and_toas(): + """A model with two chromatic Gaussian events loaded from a par string.""" + par = """ + RAJ 05:00:00 1 + DECJ 15:00:00 1 + POSEPOCH 55000 + F0 100 1 + F1 -1e-15 1 + PEPOCH 55000 + CHROMGAUSS_FREF 1400 + CHROMGAUSS_EPOCH_1 55500 1 + CHROMGAUSS_LOGAMP_1 -6.0 1 + CHROMGAUSS_CHROMIDX_1 2.0 1 + CHROMGAUSS_SIGN_1 1.0 0 + CHROMGAUSS_LOGSIG_1 1.5 1 + CHROMGAUSS_EPOCH_2 56500 1 + CHROMGAUSS_LOGAMP_2 -5.5 1 + CHROMGAUSS_CHROMIDX_2 2.0 1 + CHROMGAUSS_SIGN_2 -1.0 0 + CHROMGAUSS_LOGSIG_2 1.3 1 + TZRMJD 55000 + TZRSITE @ + TZRFRQ 1400 + EPHEM DE440 + CLOCK TT(BIPM2019) + UNITS TDB + """ + m = get_model(StringIO(par)) + + np.random.seed(123) + freqs = np.linspace(500, 1500, 8) * u.MHz + t = make_fake_toas_uniform( + 54000, + 58000, + 4000, + m, + freq=freqs, + obs="@", + add_noise=True, + multi_freqs_in_epoch=True, + ) + + return m, t + + +def test_chromgauss_from_par_two_events(chromgauss_par_two_events_model_and_toas): + """Verify that two ChromaticGaussianEvents can be parsed from a par string.""" + m, t = chromgauss_par_two_events_model_and_toas + + comp = m.components["ChromaticGaussianEvent"] + assert len(comp.get_indices()) == 2 + assert set(comp.get_indices()) == {1, 2} + + assert comp.CHROMGAUSS_EPOCH_2.value == 56500.0 + assert comp.CHROMGAUSS_LOGAMP_2.value == -5.5 + assert comp.CHROMGAUSS_SIGN_2.value == -1.0 + + res = Residuals(t, m) + assert np.all(np.isfinite(res.calc_time_resids().value)) + + +def test_chromgauss_fit_two_events(chromgauss_par_two_events_model_and_toas): + """Fit a model with two chromatic Gaussian events.""" + m, t = chromgauss_par_two_events_model_and_toas + + res = Residuals(t, m) + assert res.reduced_chi2 < 1.5 + + ftr = Fitter.auto(t, m) + ftr.fit_toas(maxiter=10) + assert ftr.resids.reduced_chi2 < 1.5 + + for p in m.free_params: + # Skip parameters with extremely tiny uncertainties + if ftr.model[p].uncertainty_value < 1e-15: + continue + assert abs(m[p].value - ftr.model[p].value) / ftr.model[p].uncertainty_value < 4