diff --git a/enterprise_extensions/frequentist/optimal_statistic.py b/enterprise_extensions/frequentist/optimal_statistic.py index d645a5ed..530a2f13 100644 --- a/enterprise_extensions/frequentist/optimal_statistic.py +++ b/enterprise_extensions/frequentist/optimal_statistic.py @@ -4,6 +4,7 @@ import numpy as np import scipy.linalg as sl +import scipy.integrate as sint from enterprise.signals import gp_priors, signal_base, utils from enterprise_extensions import model_orfs, models @@ -18,6 +19,62 @@ def warning_on_one_line(message, category, filename, lineno, file=None, line=Non warnings.formatwarning = warning_on_one_line +def imhof(u, x, eigen_values, output="cdf"): + theta = ( + 0.5 * np.sum(np.arctan(eigen_values[:, np.newaxis] * u), axis=0) - 0.5 * x * u + ) + rho = np.prod((1.0 + (eigen_values[:, np.newaxis] * u) ** 2) ** 0.25, axis=0) + + rv = np.sin(theta) / (u * rho) if output == "cdf" else np.cos(theta) / rho + + return rv + + +def gx2pdf(eigen_values, xs, cutoff=1e-6, limit=100, epsabs=1e-6): + """Calculate the GX2 PDF as a function of sx, based off of eigenvalues 'eigen_values'""" + + eigen_values = ( + eigen_values[:cutoff] + if cutoff > 1 + else eigen_values[np.abs(eigen_values) > cutoff] + ) + + return np.array( + [ + sint.quad( + lambda u: float(imhof(u, x, eigen_values, output="pdf")), + 0, + np.inf, + limit=limit, + epsabs=epsabs, + )[0] + / (2 * np.pi) + for x in xs + ] + ) + + +def gx2cdf(eigr, xs, cutoff=1e-6, limit=100, epsabs=1e-6): + """Calculate the GX2 CDF as a function of sx, based off of eigenvalues 'eigr'""" + + eigen_values = eigr[:cutoff] if cutoff > 1 else eigr[np.abs(eigr) > cutoff] + + return np.array( + [ + 0.5 + - sint.quad( + lambda u: float(imhof(u, x, eigen_values)), + 0, + np.inf, + limit=limit, + epsabs=epsabs, + )[0] + / np.pi + for x in xs + ] + ) + + class OptimalStatistic(object): """ Class for the Optimal Statistic as used in the analysis paper. @@ -476,3 +533,483 @@ def get_FNT(self, params={}): T = sc.get_basis(params=params) FNTs.append(N.solve(T, left_array=F)) return FNTs + + +def inv_RPR(phi, r): + """Invert the RphiRT matrix""" + I = np.identity(len(r)) + cf = sl.cho_factor(phi) + phi_inv = sl.cho_solve(cf, I) + Sigma = r.T @ r + phi_inv + cf = sl.cho_factor(Sigma) + return I - r @ sl.cho_solve(cf, r.T) + + +def ensure_2d_covmat(mat): + """Make sure that the covariance matrix is 2D""" + return mat if len(mat.shape) == 2 else np.diag(mat) + + +class DetectionStatistic(object): + """ + Class for the Detection Statistic as used in the p-value paper. + + This class is specifically made for classical hypothesis testing. It + requires an enterprise object for H0 and for H1. For now we assume that we + keep the white noise fixed and that the only difference between the two + hypotheses is in the way we model the Phi prior matrix. + + :param pta_h0: The enterprise PTA object for the null hypothesis. + :param pta_hs: The enterprise PTA object for the signal hypothesis. + :param dstype: The type of detection statistic to use + DF, DFCC, NP, NPMV -- default: DFCC (traditional OS) + + References: + - Section 9, van Haasteren (2025), https://arxiv.org/abs/2506.10811 + """ + + def __init__( + self, + pta_h0, + pta_hs, + dstype="DFCC", + ): + """Initialize the Detection statistic object.""" + # set up cache + self._set_cache_parameters(pta_h0, pta_hs) + self._np_stat, self._inc_auto_terms = self._get_dstype(dstype=dstype) + + def _get_dstype(self, dstype="DFCC"): + """Set the type of detection statistic + + :param type: The type of detection statistic + DF, DFCC, NP, NPMV + + :return: None + """ + np_stat, inc_auto_terms = False, False + + if dstype.upper() in ["DF"]: + np_stat = False + inc_auto_terms = True + elif dstype.upper() in ["DFCC"]: + np_stat = False + inc_auto_terms = False + elif dstype.upper() in ["NP"]: + np_stat = True + inc_auto_terms = True + elif dstype.upper() in ["NPMV"]: + np_stat = True + inc_auto_terms = False + + return np_stat, inc_auto_terms + + def _set_cache_parameters(self, pta_h0, pta_hs): + """Set the cache parameters according to the Section 9 in van Haasteren (2025) + + :param pta_h0: The enterprise PTA object for the null hypothesis + :param pta_hs: The enterprise PTA object for the signal hypothesis. + + """ + # N = L_N L_N^T ==> L_N^{-T} L_N^{-1} = N^{-1} + # T^{prime} = L_N^{-1} Tmat ( Tprime = NT in code ) + # P_T T^{prime} = T^{prime} ===> P_T = G_T G_T^T (G_T = NU in code?) + # R = G_T @ Tprime = G_T @ L_N^{-1} Tmat + # L_0 = L_B^{-1} in code with L_B @ L_B^T = C_i = I + R phi R^T + # A = L_B^{-1} G^T_T T^{prime} + # P_F = G_F @ G_F^T (Project only on the basis of the common red noise) + # A P_F = P_A A P_F = U_A U_A^T A P_F (do with the SVD of (A P_F)) + # Then, the final data and Q are: + # chi = U_A^T @ L_0 @ G_T @ L_N^{-1} @ Tmat + # Q = U_A^T @ L_0^T @ G_T^T @ L_N^{-1} @ F @ DeltaPhi @ F^T L_N^{-T} @ G_T @ L_0 @ U_A + self.pta_h0 = pta_h0 + self.pta_hs = pta_hs + + # Calculate lists of H0 quantities (11 seconds, only need it once) + Tmat = pta_h0.get_basis({}) # List of 2D matrices + self.Ndiag = pta_h0.get_ndiag({}) # Objects for sqrtsolve + NT = [ + nd.sqrtsolve(tm) for (nd, tm) in zip(self.Ndiag, Tmat) + ] # List of 2D matrices + self.G_T = [ + sl.svd(nt, full_matrices=False)[0] for nt in NT + ] # List of 2D matrices + self.R = [gt.T @ nt for (gt, nt) in zip(self.G_T, NT)] # List of 2D matrices + + # We do this here so we can avoid calculating the mask later + # 1D array of all parameters + xs = np.array( + [par_val for p in pta_h0.params for par_val in np.atleast_1d(p.sample())] + ) + pd = pta_hs.map_params(xs) + Phi_0 = [ + ensure_2d_covmat(p) for p in pta_h0.get_phi(pd) + ] # Phi matrix of H0 -- 2D arrays + BigPhiDiff = pta_hs.get_phi(pd) - sl.block_diag(*Phi_0) + + # Get only the non-zero elements of the BigPhiDiff matrix for selections later + self.par_msk = np.sum(np.abs(BigPhiDiff), axis=1) > 0 # Mask for BigPhiDiff + par_inds_offset = np.cumsum([0] + [len(p) for p in Phi_0]) + par_inds_start = par_inds_offset[:-1] + par_inds_end = par_inds_offset[1:] + par_inds_slices = [ + np.arange(p_start, p_end) + for (p_start, p_end) in zip(par_inds_start, par_inds_end) + ] + self.par_psr_msk = [ + self.par_msk[slc] for slc in par_inds_slices + ] # Mask per psr for Phi_0 and Tmat + + def _get_compressed_coordinates(self, params, normalize_Q=True): + """Returns OS, chi, and Q for the given parameters + + :param params: The parameters to use for the calculation. + :param normalize_Q: Whether to normalize the Q matrix or not. + :return: A tuple of (chi, Q, Phi, os_rank_reduced + + """ + + # These quantities have to be re-calculated for new hyperparameters + Phi_0 = [ + ensure_2d_covmat(p) for p in self.pta_h0.get_phi(params) + ] # Phi matrix of H0 -- 2D arrays + + # This is a BIG matrix, but it's sparse. Not using that right now though + # It's currently 0.4 secdonds for NG15 + BigPhiDiff = self.pta_hs.get_phi(params) - sl.block_diag( + *Phi_0 + ) # 2D prior diff array + + # Inverse Noise matrix + # List of matrix inverses -- 2D arrays + C2i_0 = [inv_RPR(p, r) for (r, p) in zip(self.R, Phi_0)] + + # Get the Square-Root (we take it from the inv for numerical stability) + C2i_0_svd = [] + for c in C2i_0: + try: + c_svd = sl.svd(c, full_matrices=True) + except sl.LinAlgError: + # GESVD is more numerically stable, but slower + c_svd = sl.svd(c, full_matrices=True, lapack_driver="gesvd") + + C2i_0_svd.append(c_svd) + + # Select only non-singular values # Singular values -- 1D arrays + C2i_sqrt_sing = [ + np.array([(np.sqrt(sv) if np.abs(sv) > 1e-10 else 0.0) for sv in s[1]]) + for s in C2i_0_svd + ] + # L matrix -- 2D arrays + L_0 = [ + svd[0] @ np.diag(s) @ svd[0].T for (svd, s) in zip(C2i_0_svd, C2i_sqrt_sing) + ] + + # Transformation 1: # List of 1D arrays (the weighted data) + Nres = [ + nd.sqrtsolve(r - rp) + for (nd, r, rp) in zip( + self.Ndiag, self.pta_h0.get_residuals(), self.pta_h0.get_delay(params) + ) + ] + + # Transformation 2: # List of 1D arrays (transformed data) + self.GTNr = [gt.T @ nr for (gt, nr) in zip(self.G_T, Nres)] + + # Transformation 3: + # From now also construct the filter transform, because it is of manaeable size + LGNr = [ + l @ gnr for (l, gnr) in zip(L_0, self.GTNr) + ] # List of 1D arrays (transformed data) + S3 = [ + l_bi @ r for (l_bi, r) in zip(L_0, self.R) + ] # List of 2D arrays (Q transformer) + + # Slice BigPhiDiff, because we only want non-zero items! + PhiDiff = BigPhiDiff[self.par_msk, :][:, self.par_msk] + + # A = L_B^{-1} G^T_T T^{prime} = L_B^{-1} @ R + # T^{prime} = L_N^{-1} Tmat + # P_T T^{prime} = T^{prime} ===> P_T = G_T G_T^T + # S3m = S3 @ G_F (is same thing as selecting the columns of S3) + # S3 matrix with only the relevant columns + S3m = [s3[:, msk] for (msk, s3) in zip(self.par_psr_msk, S3)] + + # Need to swap the projector S3m = S3 @ G_F = P_A @ S3 @ G_F = U_A U_A^T + U_A = [sl.svd(s3m, full_matrices=False)[0][:, : s3m.shape[1]] for s3m in S3m] + + # Transformation 4: + # So now the data is: + ULGNr = [ua.T @ lgnr for (ua, lgnr) in zip(U_A, LGNr)] + S4 = [ua.T @ s3m for (ua, s3m) in zip(U_A, S3m)] + + # For testing, we could also use different coordinates: + chi = ULGNr # Whitened data + chi_tot = np.concatenate(chi) # '' + S = S4 # Q transform + Phi = PhiDiff # H1-H0 difference + + # build the list of block‐sizes and cumulative indices + block_sizes = [s.shape[0] for s in S] + idx = np.cumsum([0] + block_sizes) + self._idx = idx + + npsrs = len(block_sizes) + + # slice PhiDiff into npsrs×npsrs little blocks + Phi_blocks = [ + [Phi[idx[i] : idx[i + 1], idx[j] : idx[j + 1]] for j in range(npsrs)] + for i in range(npsrs) + ] + + # numerator (inner product) of os + # denominator (trace) of os + num = 0.0 + den2 = 0.0 + ddmat = np.zeros((npsrs, npsrs)) + Q = np.zeros_like(Phi) + for i, (Si) in enumerate(S): + for j, (Sj) in enumerate(S): + SPS = Si @ Phi_blocks[i][j] @ Sj.T + Q[idx[i] : idx[i + 1], idx[j] : idx[j + 1]] = SPS + + num += chi[i].dot(SPS @ chi[j]) + ddmat[i, j] = np.trace(SPS @ SPS.T) + den2 += ddmat[i, j] + + den = np.sqrt(2 * den2) # Factor of 2 because of *real* (not complex) data + + if normalize_Q: + Q = Q / den + + return num / den, chi_tot, Q + + def _zero_block_diagonal(self, M): + """Zero the (a==b) block-diagonal using self._idx as block boundaries (in place).""" + for i in range(len(self._idx) - 1): + ii0, ii1 = self._idx[i], self._idx[i + 1] + M[ii0:ii1, ii0:ii1] = 0.0 + + return M + + def deflection_to_np(self, Q, remove_auto_terms=True): + """ + Transform what we have into a Neyman-Pearson-Weighted-optimal statistic + This is all allowed here at this stage, because B commutes with (B + I) + That means they have a common set of eigenvectors, and we can use the + same low-rank basis for B and B + I. That was really really fortunate + and cool! + + :param Q: The deflection-optimal detection statistic filter + :param remove_auto_terms: If True, remove the auto-terms from the filter + :return: The Neyman-Pearson optimal detection statistic filter + """ + cfB = sl.cho_factor(Q + np.identity(len(Q)), lower=True) + BBi = sl.cho_solve(cfB, Q) + + if remove_auto_terms: + # Only keep the cross-terms + self._zero_block_diagonal(BBi) + + BBBi = np.dot(BBi, BBi) + den = np.sqrt(2 * np.trace(BBBi)) + return BBi / den + + def get_deflection_coordinates(self, params, normalize_Q=True): + """Return the deflection-optimal detection statistic and filter + + :param params: The parameters to use for the calculation. + :param normalize_Q: Whether to normalize the Q matrix or not. + :return: A tuple of (chi, Q, Phi, os_rank_reduced + + """ + return self._get_compressed_coordinates(params, normalize_Q=normalize_Q) + + def get_np_coordinates(self, params): + """Return the Neyman-Pearson optimal detection statistic + + The Neyman-Pearson optimal statistic is easily derived from the + deflection-optimal statistic. So just use that relationship and + re-normalize the filter + + :param params: The parameters to use for the calculation. + :return: A tuple of (chi, Q, Phi, os_rank_reduced + """ + _, chi_tot, Q = self._get_compressed_coordinates(params, normalize_Q=False) + Qnp = self.deflection_to_np(Q, remove_auto_terms=not self._inc_auto_terms) + ds = np.sum(chi_tot * np.dot(Qnp, chi_tot)) + + return ds, chi_tot, Qnp + + def compute_os(self, params): + """Get the optimal statistic value / coordinates + + :param params: The parameters to use for the calculation. + + """ + + if self._np_stat: + return self.get_np_coordinates(params)[0] + else: + return self.get_deflection_coordinates(params)[0] + + def get_fixedpar_os_distribution( + self, + params, + ds_min=-10, + ds_max=20, + cutoff=1e-6, + limit=100, + epsabs=1e-6, + Q=None, + kind="cdf", + ): + """For given parameters, get the OS distribution PDF/CDF under H0 + + :param params: The parameters to use for the calculation. + :param ds_min: The minimum value of the OS distribution. + :param ds_max: The maximum value of the OS distribution. + :param cutoff: Only eigenvalues above this value are included + :param limit: An upper bound on the number of subintervals used + in the adaptive integration algorithm + :param epsabs: The absolute error tolerance for the integration + :param Q: The Q matrix to use for the calculation. If None, + it will be computed from the parameters. + :param kind: The kind of distribution to return, either 'cdf' or 'pdf' + + :return: A tuple of (ds, dist) where ds are the detection statistic values + + """ + if Q is None: + _, _, Q = self.compute_os(params) + eigen_values = sl.eigvalsh(Q) + + xs = np.linspace(ds_min, ds_max, 1000) + + if kind == "cdf": + dist = gx2cdf(eigen_values, xs, cutoff=cutoff, limit=limit, epsabs=epsabs) + elif kind == "pdf": + dist = gx2pdf(eigen_values, xs, cutoff=cutoff, limit=limit, epsabs=epsabs) + else: + raise ValueError("Parameter 'kind' has to be cdf/pdf") + + return xs, dist + + def get_roc_curve( + self, + params, + ds_min=-10, + ds_max=20, + cutoff=1e-6, + limit=100, + epsabs=1e-6, + calc_pdf=False, + ): + """For given parameters, get the ROC curve + + :param params: The parameters to use for the calculation. + :param ds_min: The minimum value of the OS distribution. + :param ds_max: The maximum value of the OS distribution. + :param cutoff: Only eigenvalues above this value are included + :param limit: An upper bound on the number of subintervals used + in the adaptive integration algorithm + :param epsabs: The absolute error tolerance for the integration + :param calc_pdf: Whether to calculate the PDF as well. If False, + only the CDF will be calculated. + + :return: A tuple of (ds, cdf_h0, cdf_hs) if only CDFs are calculated + A tuple of (ds, pdf_h0, cdf_h0, pdf_hs, cdf_hs) if also PFDs + + """ + _, chi_tot, Q = self.get_deflection_coordinates(params, normalize_Q=False) + C = Q + np.identity(len(chi_tot)) + L = sl.cholesky(C, lower=True) + + if self._np_stat: + Q = self.deflection_to_np(Q, remove_auto_terms=not self._inc_auto_terms) + + # Normalize the filter + QQ = np.dot(Q, Q) + Q = Q / np.sqrt(2 * np.trace(QQ)) + + # Whiten filter under H_S (already white under H_0) + QH0 = Q + QHS = L.T @ Q @ L + + xs, cdf_h0 = self.get_fixedpar_os_distribution( + params, + ds_min=ds_min, + ds_max=ds_max, + cutoff=cutoff, + limit=limit, + epsabs=epsabs, + Q=QH0, + ) + + _, cdf_hs = self.get_fixedpar_os_distribution( + params, + ds_min=ds_min, + ds_max=ds_max, + cutoff=cutoff, + limit=limit, + epsabs=epsabs, + Q=QHS, + ) + + # If we are using auto terms, we need to: + if self._inc_auto_terms: + xs -= np.trace(QH0) + + if calc_pdf: + _, pdf_h0 = self.get_fixedpar_os_distribution( + params, + ds_min=ds_min, + ds_max=ds_max, + cutoff=cutoff, + limit=limit, + epsabs=epsabs, + Q=QH0, + kind="pdf", + ) + + _, pdf_hs = self.get_fixedpar_os_distribution( + params, + ds_min=ds_min, + ds_max=ds_max, + cutoff=cutoff, + limit=limit, + epsabs=epsabs, + Q=QHS, + kind="pdf", + ) + + return xs, pdf_h0, cdf_h0, pdf_hs, cdf_hs + + else: + return xs, cdf_h0, cdf_hs + + def get_fixedpar_pval(self, params, cutoff=1e-6, limit=200, epsabs=1e-9): + """calculate the p-value for the OS under H0 + + :param params: The parameters to use for the calculation. + :param ds_min: The minimum value of the OS distribution. + :param ds_max: The maximum value of the OS distribution. + :param cutoff: Only eigenvalues above this value are included + :param limit: An upper bound on the number of subintervals used + in the adaptive integration algorithm + :param epsabs: The absolute error tolerance for the integration + + :return: The p-value for the OS under H0, which is 1 - CDF(OS) + + """ + if self._np_stat: + os, _, Q = self.get_np_coordinates(params) + else: + os, _, Q = self.get_deflection_coordinates(params) + + eigen_values = sl.eigvalsh(Q) + cdf_val = gx2cdf(eigen_values, [os], cutoff=cutoff, limit=limit, epsabs=epsabs)[ + 0 + ] + + return 1 - cdf_val