diff --git a/strkit/mi/result.py b/strkit/mi/result.py index e2c1eed..e4fc130 100644 --- a/strkit/mi/result.py +++ b/strkit/mi/result.py @@ -15,7 +15,7 @@ from strkit.logger import get_main_logger from strkit.utils import cat_strs, cis_overlap -from typing import Generator, Iterable, Optional, Union +from typing import Generator, Iterable, Literal, Optional, Union __all__ = [ "MILocusData", @@ -26,7 +26,18 @@ OptionalReadCounts = Optional[tuple[tuple[int, ...], tuple[int, ...]]] -INHERITANCE_CONFIGS = ( +ParentInheritanceConfig = tuple[Literal[0, 1], Literal[0, 1]] +InheritanceConfig = tuple[Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1]] + +# Only the parental parts of the below INHERITANCE_CONFIGS constant - which alleles were inherited from each parent. +PARENT_INHERITANCE_CONFIGS: tuple[ParentInheritanceConfig, ...] = ( + (0, 0), # maternal 0 allele, paternal 0 allele + (0, 1), # maternal 0 allele, paternal 1 allele + (1, 0), # maternal 1 allele, paternal 0 allele + (1, 1), # maternal 1 allele, paternal 1 allele +) + +INHERITANCE_CONFIGS: tuple[InheritanceConfig, ...] = ( (0, 1, 0, 0), # allele 0 maternal, allele 1 paternal, maternal 0 allele, paternal 0 allele (0, 1, 0, 1), # allele 0 maternal, allele 1 paternal, maternal 0 allele, paternal 1 allele (0, 1, 1, 0), # allele 0 maternal, allele 1 paternal, maternal 1 allele, paternal 0 allele @@ -38,6 +49,8 @@ (1, 0, 1, 1), # allele 1 maternal, allele 0 paternal, maternal 1 allele, paternal 1 allele ) +MutationFrom = Literal["none", "?", "mat", "pat", "both"] + class MILocusData: def __init__( @@ -60,6 +73,7 @@ def __init__( widen: float = 0, test_to_perform: str = "none", + sig_level: float = 0.05, logger: Optional[logging.Logger] = None, ): @@ -90,10 +104,14 @@ def __init__( self._decimal_threshold: float = 0.5 self._widen: float = widen + # --- begin de novo mutation-related fields/initialization --- self._p_value = None self._adj_p_value = None - if test_to_perform != "none": - self._p_value = self.de_novo_test(test_to_perform) + self._most_likely_config: Optional[ParentInheritanceConfig] = None + self._mutation_from: MutationFrom = "none" + if test_to_perform != "none" and (de_novo_res := self.de_novo_test(test_to_perform, sig_level)): + self._p_value, self._most_likely_config, self._mutation_from = de_novo_res + # --- end de novo mutation-related fields/initialization --- self._logger = logger or get_main_logger() @@ -186,6 +204,14 @@ def adj_p_value(self, value: Optional[float]): raise e self._adj_p_value = value + @property + def most_likely_inheritance_config(self) -> Optional[InheritanceConfig]: + return self._most_likely_config + + @property + def mutation_from(self) -> MutationFrom: + return self._mutation_from + @staticmethod def _respects_strict_ci(c_gt, m_gt, f_gt) -> bool: # First hypothesis: first allele from mother, second from father @@ -251,6 +277,17 @@ def _respects_mi_ci(self, c_gt_ci, m_gt_ci, f_gt_ci, widen: float) -> Optional[b self._logger.error(f"Encountered invalid child confidence intervals: {c_gt_ci} ({e})") return None + @staticmethod + def _mutation_from_mat_pat_p_vals(mat_p: float, pat_p: float, unadj_sig_level: float) -> MutationFrom: + if mat_p < unadj_sig_level and pat_p < unadj_sig_level: + return "both" + elif mat_p < unadj_sig_level: + return "mat" + elif pat_p < unadj_sig_level: + return "pat" + else: + return "?" + def respects_mi(self, widen: Optional[float] = None) -> tuple[bool, bool, Optional[bool], Optional[bool]]: fn = self._respects_decimal_ci if self._decimal else MILocusData._respects_strict_ci respects_mi_strict = fn(self._child_gt, self._mother_gt, self._father_gt) @@ -272,17 +309,27 @@ def respects_mi(self, widen: Optional[float] = None) -> tuple[bool, bool, Option return respects_mi_strict, respects_mi_pm1, respects_mi_95_ci, respects_mi_99_ci - def de_novo_test(self, test: str) -> Optional[float]: - test_res = [] + def de_novo_test( + self, test: str, unadj_sig_level: float + ) -> Optional[tuple[float, ParentInheritanceConfig, MutationFrom]]: + most_likely_p_val: float = 0.0 + most_likely_config: ParentInheritanceConfig = PARENT_INHERITANCE_CONFIGS[0] cr_all = self._child_read_counts_flattened cr_all_set = set(cr_all) - for config in INHERITANCE_CONFIGS[:4]: # Don't need child allele configs here + for config in PARENT_INHERITANCE_CONFIGS: # Don't need child allele configs here currently # TODO: I'm pretty sure this can be more numpy-ified / sped up - mat_reads = self._mother_read_counts[config[2]] - pat_reads = self._father_read_counts[config[3]] + # TODO: split alleles, do all configs, and generate two p-values instead of one? or separately determine + # which allele comes from where? + # Proposed flow: 1) have two p-values per locus; probability of being drawn from the same dist for allele 0 + # and for allele 1 - multiply together to determine 'most likely' (sketchy statistics) + # Then, after multiple testing correction, any p-values below the threshold are used to determine of + # there's a mutation from maternal, paternal, or both, right before output. + + mat_reads = self._mother_read_counts[config[0]] + pat_reads = self._father_read_counts[config[1]] parent_all = np.array((*mat_reads, *pat_reads)) @@ -330,14 +377,48 @@ def de_novo_test(self, test: str) -> Optional[float]: # Same-variance requirements should hold almost all the time, unless there is suspected mosaicism # (and then X2 should be used instead) p_val = sst.mannwhitneyu( - cr_all, parent_all, + cr_all, alternative="greater" if test == "wmw-gt" else "two-sided", use_continuity=False)[1] - test_res.append(float(p_val)) # cast to make sure we're in native float type + p_val_f = float(p_val) # cast to make sure we're in native float type + + if p_val_f > most_likely_p_val: + most_likely_p_val = p_val_f + most_likely_config = config + + # Figure out where mutation is probably from, if any (multiple testing correction will be applied later, so we + # will report a "mutation_from" which may not be significant in the end.) + mutation_from: MutationFrom = "none" + + if most_likely_p_val < unadj_sig_level: + mat_reads = self._mother_read_counts[most_likely_config[0]] + pat_reads = self._father_read_counts[most_likely_config[1]] + + mat_dist_config_1 = np.abs(np.mean(self._child_read_counts[0]) - np.mean(mat_reads)) + pat_dist_config_1 = np.abs(np.mean(self._child_read_counts[1]) - np.mean(pat_reads)) + dist_config_1 = mat_dist_config_1 + pat_dist_config_1 + + mat_dist_config_2 = np.abs(np.mean(self._child_read_counts[1]) - np.mean(mat_reads)) + pat_dist_config_2 = np.abs(np.mean(self._child_read_counts[0]) - np.mean(pat_reads)) + dist_config_2 = mat_dist_config_2 + pat_dist_config_2 + + if dist_config_2 > dist_config_1: + # Choose configuration 1, since it minimizes distance + mat_test = sst.mannwhitneyu(mat_reads, self._child_read_counts[0], use_continuity=False)[1] + pat_test = sst.mannwhitneyu(pat_reads, self._child_read_counts[1], use_continuity=False)[1] + mutation_from = self._mutation_from_mat_pat_p_vals(mat_test, pat_test, unadj_sig_level) + elif dist_config_1 > dist_config_2: + # Choose configuration 2, since it minimizes distance + mat_test = sst.mannwhitneyu(mat_reads, self._child_read_counts[1], use_continuity=False)[1] + pat_test = sst.mannwhitneyu(pat_reads, self._child_read_counts[0], use_continuity=False)[1] + mutation_from = self._mutation_from_mat_pat_p_vals(mat_test, pat_test, unadj_sig_level) + else: + # no difference between distances to guess mutation origin configuration + mutation_from = "?" - return max(test_res) + return most_likely_p_val, most_likely_config, mutation_from def __iter__(self): # for dict casting yield "contig", self._contig @@ -356,8 +437,9 @@ def __iter__(self): # for dict casting if self._p_value: yield "p", self._p_value - if self._adj_p_value: - yield "p_adj", self._adj_p_value + yield "mutation_from", self._mutation_from + if self._adj_p_value: + yield "p_adj", self._adj_p_value def __str__(self): def _opt_ci_str(ci_str, level="95"):