|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import pysam |
| 4 | + |
| 5 | +from typing import Optional |
| 6 | + |
| 7 | +from .base import BaseCalculator |
| 8 | +from .result import MIContigResult, MILocusData |
| 9 | +from .vcf_utils import VCFCalculatorMixin |
| 10 | + |
| 11 | +__all__ = ["LongTRCalculator"] |
| 12 | + |
| 13 | + |
| 14 | +class LongTRCalculator(BaseCalculator, VCFCalculatorMixin): |
| 15 | + def _get_sample_contigs(self) -> tuple[set, set, set]: |
| 16 | + return self.get_contigs_from_files(self._mother_call_file, self._father_call_file, self._child_call_file) |
| 17 | + |
| 18 | + def calculate_contig(self, contig: str) -> MIContigResult: |
| 19 | + cr = MIContigResult(contig, includes_95_ci=True) |
| 20 | + |
| 21 | + mvf = pysam.VariantFile(str(self._mother_call_file)) |
| 22 | + fvf = pysam.VariantFile(str(self._father_call_file)) |
| 23 | + cvf = pysam.VariantFile(str(self._child_call_file)) |
| 24 | + |
| 25 | + # We want all common loci, so loop through the child and then look for the loci in the parent calls |
| 26 | + |
| 27 | + for cv in cvf.fetch(contig): |
| 28 | + mv = next(mvf.fetch(contig, cv.start, cv.stop), None) |
| 29 | + fv = next(fvf.fetch(contig, cv.start, cv.stop), None) |
| 30 | + |
| 31 | + # TODO: Handle sex chromosomes |
| 32 | + |
| 33 | + k = (contig, cv.start, cv.stop) |
| 34 | + |
| 35 | + if self.should_skip_locus(*k): |
| 36 | + continue |
| 37 | + |
| 38 | + cr.seen_locus(*k) |
| 39 | + |
| 40 | + if mv is None or fv is None: |
| 41 | + # Variant isn't found in at least one of the parents, so we can't do anything with it. |
| 42 | + # TODO: We need to actually check calls, and check with sample ID, not just assume |
| 43 | + continue |
| 44 | + |
| 45 | + # TODO: Handle missing samples gracefully |
| 46 | + # TODO: Handle wrong formatted VCFs gracefully |
| 47 | + |
| 48 | + # Need to dig up original motif from the locus file - thus, the original locus file is required. |
| 49 | + motif: Optional[str] = next(iter(self.get_loci_overlapping(*k)), (None,))[-1] |
| 50 | + if not motif: |
| 51 | + continue |
| 52 | + |
| 53 | + motif_len = len(motif) |
| 54 | + |
| 55 | + cs = cv.samples[self._child_id or 0] |
| 56 | + ms = mv.samples[self._mother_id or 0] |
| 57 | + fs = fv.samples[self._father_id or 0] |
| 58 | + |
| 59 | + c_gt = tuple(sorted(len(cv.alleles[g]) / motif_len for g in cs["GT"])) if None not in cs["GT"] else None |
| 60 | + m_gt = tuple(sorted(len(mv.alleles[g]) / motif_len for g in ms["GT"])) if None not in ms["GT"] else None |
| 61 | + f_gt = tuple(sorted(len(fv.alleles[g]) / motif_len for g in fs["GT"])) if None not in fs["GT"] else None |
| 62 | + |
| 63 | + if c_gt is None or m_gt is None or f_gt is None: |
| 64 | + # None call in VCF, skip this call |
| 65 | + continue |
| 66 | + |
| 67 | + cr.append(MILocusData( |
| 68 | + contig=contig, |
| 69 | + start=cv.pos, |
| 70 | + end=cv.stop, |
| 71 | + motif=motif, |
| 72 | + |
| 73 | + child_gt=c_gt, mother_gt=m_gt, father_gt=f_gt, |
| 74 | + |
| 75 | + decimal=True, |
| 76 | + )) |
| 77 | + |
| 78 | + return cr |
0 commit comments