Skip to content

Commit

Permalink
Tidied up implementation of duplicate marking in GroupReadsByUmi.
Browse files Browse the repository at this point in the history
  • Loading branch information
tfenne committed Sep 28, 2023
1 parent c79d7bd commit 5a75724
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter
private var collectedStats: Boolean = false

protected val iter: Iterator[SamRecord] = {
val filteredIterator = sourceIterator.filterNot(r => r.secondary || r.supplementary)

// Wrap our input iterator in a progress logging iterator if we have a progress logger
val progressIterator = progress match {
case Some(p) => sourceIterator.tapEach { r => p.record(r) }
case None => sourceIterator
case Some(p) => filteredIterator.tapEach { r => p.record(r) }
case None => filteredIterator
}

// Then turn it into a grouping iterator
Expand Down
118 changes: 45 additions & 73 deletions src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ import com.fulcrumgenomics.util.Metric.{Count, Proportion}
import com.fulcrumgenomics.util.Sequences.countMismatches
import com.fulcrumgenomics.util._
import enumeratum.EnumEntry
import htsjdk.samtools.DuplicateScoringStrategy.ScoringStrategy
import htsjdk.samtools._
import htsjdk.samtools.util.SequenceUtil

import scala.collection.{BufferedIterator, Iterator, mutable}
import scala.collection.immutable.IndexedSeq
import scala.collection.mutable.ListBuffer
import scala.tools.nsc.doc.html.HtmlTags.B


object GroupReadsByUmi {
Expand Down Expand Up @@ -438,6 +440,9 @@ object Strategy extends FgBioEnum[Strategy] {
| 3. The assigned UMI tag
| 4. Read Name
|
|If the input is not template-coordinate sorted (i.e. `SO:unsorted GO:query SS:unsorted:template-coordinate`), then
|this tool will re-sort the input. The output will be written in template-coordinate order.
|
|During grouping, reads are filtered out if a) all reads with the same queryname are unmapped, b) any primary
|read has mapping quality < `min-map-q` (default=1), c) the primary mappings for R1 and R2 are on different
|chromosomes and `--allow-inter-contig` has been set to false., or d.) all non-primary reads are filtered,
Expand Down Expand Up @@ -470,43 +475,36 @@ object Strategy extends FgBioEnum[Strategy] {
|(i.e. does not count dashes and other non-ACGT characters). This option is not implemented for reads with UMI pairs
|(i.e. using the paired assigner).
|
|If mark duplicates flag is set, will set duplicate bit flag on all non-representative reads inside of a UMI group
|which are output.
|
|Strategies for choosing representative read is independent of UMI grouping strategy.
|
|Choosing representative read is preformed by one of three strategies:
|If the `--mark-duplicates` option is given, reads will also have their duplicate flag set in the BAM file.
|Each tag-family is treated separately, and a single template within the tag family is chosen to be the "unique"
|template and marked as non-duplicate, while all other templates in the tag family are then marked as duplicate.
|
|In each strategy, ties are broken by mapping quality
|Several parameters have different defaults depending on whether duplicates are being marked or not (all are
|directly settable on the command line):
|
|1. **SUM_OF_BASE_QUALITIES**: Sum base quality across entire read, the read with the highest combined base quality is chosen.
|2. **TOTAL_MAPPED_REFERENCE_LENGTH**: Sum mapped reference across each paired read ( if paired , the longest read is chosen.
|3. **RANDOM**: Choose a representative read at random.
|
|If the input is not template-coordinate sorted (i.e. `SO:unsorted GO:query SS:unsorted:template-coordinate`), then
|this tool will re-sort the input. The output will be written in template-coordinate order.
| 1. `--min-map-q` defaults to 0 in duplicate marking mode and 1 otherwise
| 2. `--include-secondary` defaults to true in duplicate marking mode and false otherwise
| 3. `--include-suppementary` defaults to true in duplicate marking mode and false otherwise
"""
)
class GroupReadsByUmi
( @arg(flag='i', doc="The input BAM file.") val input: PathToBam = Io.StdIn,
@arg(flag='o', doc="The output BAM file.") val output: PathToBam = Io.StdOut,
@arg(flag='f', doc="Optional output of tag family size counts.") val familySizeHistogram: Option[FilePath] = None,
@arg(flag='t', doc="The tag containing the raw UMI.") val rawTag: String = "RX",
@arg(flag='T', doc="The output tag for UMI grouping.") val assignTag: String = "MI",
@arg(flag='d', doc="Mark duplicates, duplicate bitflag will be set on non-representative reads.") val markDup: Boolean = false,
@arg(flag='D', doc="If -d is set, representative read assignment strategy, default = Sum Of Base Qualities.")
val dupStrategy: DuplicateScoringStrategy.ScoringStrategy = DuplicateScoringStrategy.ScoringStrategy.SUM_OF_BASE_QUALITIES,
@arg(flag='S', doc="Include secondary reads.") val includeSecondary: Boolean = false,
@arg(flag='U', doc="Include supplementary reads.") val includeSupplementary: Boolean = false,
@arg(flag='m', doc="Minimum mapping quality for mapped reads.") val minMapQ: Int = 1,
@arg(flag='n', doc="Include non-PF reads.") val includeNonPfReads: Boolean = false,
@arg(flag='s', doc="The UMI assignment strategy.") val strategy: Strategy,
@arg(flag='e', doc="The allowable number of edits between UMIs.") val edits: Int = 1,
@arg(flag='l', doc= """The minimum UMI length. If not specified then all UMIs must have the same length,
(@arg(flag='i', doc="The input BAM file.") val input: PathToBam = Io.StdIn,
@arg(flag='o', doc="The output BAM file.") val output: PathToBam = Io.StdOut,
@arg(flag='f', doc="Optional output of tag family size counts.") val familySizeHistogram: Option[FilePath] = None,
@arg(flag='t', doc="The tag containing the raw UMI.") val rawTag: String = "RX",
@arg(flag='T', doc="The output tag for UMI grouping.") val assignTag: String = "MI",
@arg(flag='d', doc="Turn on duplicate marking mode.") val markDuplicates: Boolean = false,
@arg(flag='S', doc="Include secondary reads.") val includeSecondary: Option[Boolean] = None,
@arg(flag='U', doc="Include supplementary reads.") val includeSupplementary: Option[Boolean] = None,
@arg(flag='m', doc="Minimum mapping quality for mapped reads.") val minMapQ: Option[Int] = None,
@arg(flag='n', doc="Include non-PF reads.") val includeNonPfReads: Boolean = false,
@arg(flag='s', doc="The UMI assignment strategy.") val strategy: Strategy,
@arg(flag='e', doc="The allowable number of edits between UMIs.") val edits: Int = 1,
@arg(flag='l', doc= """The minimum UMI length. If not specified then all UMIs must have the same length,
|otherwise discard reads with UMIs shorter than this length and allow for differing UMI lengths.
|""")
val minUmiLength: Option[Int] = None,
@arg(flag='x', doc= """
@arg(flag='x', doc= """
|DEPRECATED: this option will be removed in future versions and inter-contig reads will be
|automatically processed.""")
@deprecated val allowInterContig: Boolean = true
Expand All @@ -517,6 +515,10 @@ class GroupReadsByUmi

private val assigner = strategy.newStrategy(this.edits)

// Give values to unset parameters that are different in duplicate marking mode
private val _minMapQ = this.minMapQ.getOrElse(if (this.markDuplicates) 0 else 1)
private val _includeSecondaryReads = this.includeSecondary.getOrElse(this.markDuplicates)
private val _includeSupplementaryReads = this.includeSupplementary.getOrElse(this.markDuplicates)

/** Checks that the read's mapq is over a minimum, and if the read is paired, that the mate mapq is also over the min. */
private def mapqOk(rec: SamRecord, minMapQ: Int): Boolean = {
Expand Down Expand Up @@ -564,12 +566,12 @@ class GroupReadsByUmi
// Filter and sort the input BAM file
logger.info("Filtering the input.")
val filteredIterator = in.iterator
.filter(r => includeSecondary || !r.secondary )
.filter(r => includeSupplementary || !r.supplementary )
.filter(r => this._includeSecondaryReads || !r.secondary )
.filter(r => this._includeSupplementaryReads || !r.supplementary )
.filter(r => (includeNonPfReads || r.pf) || { filteredNonPf += 1; false })
.filter(r => (r.mapped || (r.paired && r.mateMapped)) || { filteredPoorAlignment += 1; false })
.filter(r => (allowInterContig || r.unpaired || r.refIndex == r.mateRefIndex) || { filteredPoorAlignment += 1; false })
.filter(r => mapqOk(r, this.minMapQ) || { filteredPoorAlignment += 1; false })
.filter(r => mapqOk(r, this._minMapQ) || { filteredPoorAlignment += 1; false })
.filter(r => !r.get[String](rawTag).exists(_.contains('N')) || { filteredNsInUmi += 1; false })
.filter { r =>
this.minUmiLength.forall { l =>
Expand Down Expand Up @@ -633,8 +635,8 @@ class GroupReadsByUmi
val templatesByMi = templates.groupBy { t => t.r1.get.apply[String](this.assignTag) }

// If marking duplicates, assign bitflag to all duplicate reads
if (this.markDup) {
templatesByMi.values.foreach(t => assignRepReadToGroup(t))
if (this.markDuplicates) {
templatesByMi.values.foreach(t => setDuplicateFlags(t))
}

// Then output the records in the right order (assigned tag, read name, r1, r2)
Expand Down Expand Up @@ -756,47 +758,17 @@ class GroupReadsByUmi
}
}

private def assignRepReadToGroup(group: Seq[Template]): Unit = {
//check all primary reads have SAME UMI defined
group.headOption match {
case Some(firstObject) =>
val groupUmi = firstObject.r1.get[String](this.assignTag)
group.foreach {
t =>
t.primaryReads.foreach { r =>
if (r.get[String](this.assignTag).getOrElse("") != groupUmi)
fail(s"Record '$r' has mismatched UMI from group '${groupUmi}'")
}
}
}
val deDupGroup = group.sortWith{ (t1, t2) =>
compareScore(t1.primaryReads, t2.primaryReads)
/** Sets the duplicate flags on all reads within all templates. */
private def setDuplicateFlags(group: Seq[Template]): Unit = {
val nonDuplicateTemplate = group.minBy { template =>
template.primaryReads.sumBy { r =>
DuplicateScoringStrategy.computeDuplicateScore(r.asSam, ScoringStrategy.SUM_OF_BASE_QUALITIES)
}
}
//0 element is highest scoring item, therefor is the representative read.
//mark all SamRecords in other elements as duplicate.
deDupGroup.slice(1,deDupGroup.length).foreach { t => t.allReads.foreach( r => r.duplicate = true) }
}

private def compareScore(priRead1: Iterator[SamRecord], priRead2: Iterator[SamRecord]): Boolean = {
var score1 = 0
var score2 = 0
var r1MapQ = 0
var r2MapQ = 0
priRead1.foreach{ r =>
score1 = score1 + DuplicateScoringStrategy.computeDuplicateScore(r.asSam, this.dupStrategy, true)
r1MapQ = r1MapQ + r.mapq
}
priRead2.foreach { r =>
score2 = score2 + DuplicateScoringStrategy.computeDuplicateScore(r.asSam, this.dupStrategy, true)
r2MapQ = r2MapQ + r.mapq
}
if (score1 == score2) {
// tie break on mapQ
r1MapQ > r2MapQ
}
else {
score1 > score2
group.foreach { template =>
val flag = !(template eq nonDuplicateTemplate)
template.allReads.foreach(_.duplicate = flag)
}
}

}
11 changes: 5 additions & 6 deletions src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT
val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
val tool = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), rawTag="RX", assignTag="MI", strategy=Strategy.Edit, edits=1, minMapQ=30)
val tool = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), rawTag="RX", assignTag="MI", strategy=Strategy.Edit, edits=1, minMapQ=Some(30))
val logs = executeFgbioTool(tool)

val groups = readBamRecs(out).groupBy(_.name.charAt(0))
Expand Down Expand Up @@ -267,10 +267,9 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT
val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
val gr = new GroupReadsByUmi(minMapQ = 0, input = in, output = out, familySizeHistogram = Some(hist), rawTag = "RX", assignTag = "MI", strategy = Strategy.Paired, edits = 1, markDup = true)
val gr = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), strategy=Strategy.Paired, edits=1, markDuplicates=true)

gr.markDup shouldBe true
gr.dupStrategy shouldBe DuplicateScoringStrategy.ScoringStrategy.SUM_OF_BASE_QUALITIES
gr.markDuplicates shouldBe true

val recs = readBamRecs(out)
recs.filter(_.name.equals("a01")).forall(_.duplicate == true) shouldBe true
Expand All @@ -290,7 +289,7 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT
val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
new GroupReadsByUmi(minMapQ = 0, input = in, output = out, familySizeHistogram = Some(hist), rawTag = "RX", assignTag = "MI", strategy = Strategy.Paired, edits = 1).execute()
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), strategy=Strategy.Paired, edits=1).execute()

val recs = readBamRecs(out)
recs.filter(_.name.equals("a01")).forall(_.duplicate == false) shouldBe true
Expand All @@ -309,7 +308,7 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT
val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
new GroupReadsByUmi(input = in, output = out, familySizeHistogram = Some(hist), rawTag = "RX", assignTag = "MI", strategy = Strategy.Edit, edits = 1, markDup = true).execute()
new GroupReadsByUmi(input = in, output = out, familySizeHistogram = Some(hist), rawTag = "RX", assignTag = "MI", strategy = Strategy.Edit, edits = 1, markDuplicates = true).execute()

val recs = readBamRecs(out)
recs.filter(_.name.equals("a01")).forall(_.duplicate == false) shouldBe true
Expand Down

0 comments on commit 5a75724

Please sign in to comment.