diff --git a/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapper.java b/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapper.java index 19783e1984f..021838833c2 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapper.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapper.java @@ -17,23 +17,31 @@ import org.broadinstitute.barclay.help.DocumentedFeature; import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; import org.broadinstitute.hellbender.cmdline.programgroups.FlowBasedProgramGroup; -import org.broadinstitute.hellbender.engine.*; +import org.broadinstitute.hellbender.engine.FeatureContext; +import org.broadinstitute.hellbender.engine.GATKPath; +import org.broadinstitute.hellbender.engine.ReferenceContext; import org.broadinstitute.hellbender.exceptions.GATKException; import org.broadinstitute.hellbender.tools.FlowBasedArgumentCollection; -import org.broadinstitute.hellbender.tools.walkers.haplotypecaller.*; +import org.broadinstitute.hellbender.tools.walkers.haplotypecaller.AssemblyBasedCallerArgumentCollection; +import org.broadinstitute.hellbender.tools.walkers.haplotypecaller.HaplotypeCallerArgumentCollection; +import org.broadinstitute.hellbender.tools.walkers.haplotypecaller.ReferenceConfidenceMode; import org.broadinstitute.hellbender.utils.SimpleInterval; import org.broadinstitute.hellbender.utils.Utils; +import org.broadinstitute.hellbender.utils.fasta.CachingIndexedFastaSequenceFile; +import org.broadinstitute.hellbender.utils.haplotype.FlowBasedHaplotype; import org.broadinstitute.hellbender.utils.haplotype.Haplotype; +import org.broadinstitute.hellbender.utils.read.FlowBasedRead; import org.broadinstitute.hellbender.utils.read.FlowBasedReadUtils; import org.broadinstitute.hellbender.utils.read.GATKRead; +import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter; import org.broadinstitute.hellbender.utils.variant.GATKVCFConstants; import org.broadinstitute.hellbender.utils.variant.GATKVCFHeaderLines; import org.broadinstitute.hellbender.utils.variant.GATKVariantContextUtils; import org.broadinstitute.hellbender.utils.variant.writers.GVCFWriter; -import org.broadinstitute.hellbender.utils.haplotype.FlowBasedHaplotype; -import org.broadinstitute.hellbender.utils.read.FlowBasedRead; import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Supplier; /** @@ -81,7 +89,11 @@ @DocumentedFeature @ExperimentalFeature -public final class FlowFeatureMapper extends ReadWalker { +public final class FlowFeatureMapper extends ThreadedReadWalker { + + public static final int CAPACITY1 = 5000; + private static final int HAPLOTYPE_OPTIMIZED_SIZE = 10; + private static final long CACHE_SIZE_FACTOR = 5; static class CopyAttrInfo { public final String name; @@ -163,6 +175,15 @@ public CopyAttrInfo(final String spec) { @ArgumentCollection public FlowBasedArgumentCollection fbargs = new FlowBasedArgumentCollection(); + // emit() message and queue + static class WriterMessage { + MappedFeature mappedFeature; + } + private LinkedBlockingQueue writerQueue; + private Thread writerWorker; + @Argument(fullName = "threaded-writer", doc = "turn threaded writer on?", optional = true) + public boolean threadedWriter = false; + protected static class ReadContext implements Comparable { final GATKRead read; final ReferenceContext referenceContext; @@ -186,6 +207,7 @@ public int compareTo(ReadContext o) { protected static class MappedFeature implements Comparable { GATKRead read; + ReferenceContext referenceContext; FlowFeatureMapperArgumentCollection.MappingFeatureEnum type; byte[] readBases; byte[] refBases; @@ -197,7 +219,7 @@ protected static class MappedFeature implements Comparable { int filteredCount; int nonIdentMBasesOnRead; int featuresOnRead; - int refEditDistance; + Supplier refEditDistance; int index; int smqLeft; int smqRight; @@ -207,9 +229,10 @@ protected static class MappedFeature implements Comparable { double scoreForBase[]; boolean adjacentRefDiff; - public MappedFeature(GATKRead read, FlowFeatureMapperArgumentCollection.MappingFeatureEnum type, byte[] readBases, + public MappedFeature(GATKRead read, ReferenceContext referenceContext, FlowFeatureMapperArgumentCollection.MappingFeatureEnum type, byte[] readBases, byte[] refBases, int readBasesOffset, int start, int offsetDelta) { this.read = read; + this.referenceContext = referenceContext; this.type = type; this.readBases = readBases; this.refBases = refBases; @@ -218,11 +241,12 @@ public MappedFeature(GATKRead read, FlowFeatureMapperArgumentCollection.MappingF this.offsetDelta = offsetDelta; } - static MappedFeature makeSNV(GATKRead read, int offset, byte refBase, int start, int offsetDelta) { + static MappedFeature makeSNV(GATKRead read, int offset, byte refBase, int start, int offsetDelta, ReferenceContext referenceContext) { byte[] readBases = {read.getBasesNoCopy()[offset]}; byte[] refBases = {refBase}; return new MappedFeature( read, + referenceContext, FlowFeatureMapperArgumentCollection.MappingFeatureEnum.SNV, readBases, refBases, @@ -255,6 +279,112 @@ public int compareTo(MappedFeature o) { } } + // private utility class, for generating haplotypes and a read spanning the area around the feature + private class FeatureHaplotypes { + + final FlowBasedReadUtils.ReadGroupInfo rgInfo; + final FlowBasedHaplotype[] haplotypes = new FlowBasedHaplotype[2]; + final int fromBaseIndex; + final int toBaseIndex; + + private FeatureHaplotypes(MappedFeature fr, byte altBase) { + + // build bases for flow haplotypes + // NOTE!!!: this code assumes length of feature on read and reference is the same + // this is true for SNP but not for INDELs - it will have to be re-written! + // TODO: write for INDEL + byte[] bases = fr.read.getBasesNoCopy(); + int offset = fr.readBasesOffset; + int refStart = fr.start; + int refModOfs = 0; + + // install alt base? + byte orgBase = 0; + if ( altBase != 0 ) { + orgBase = fr.refBases[0]; + fr.refBases[0] = altBase; + } + + int altOffset = offset; + if ( offset > 0 ) { + // reach into hmer before + offset--; + refModOfs++; + refStart--; + + // extend until start of hmer + final byte hmerBase = bases[offset]; + while ( offset > 0 && bases[offset-1] == hmerBase ) { + offset--; + refModOfs++; + refStart--; + } + } + + // build two haplotypes with existing data from the read + final int trimTo = Math.min(altOffset + fr.readBases.length + HAPLOTYPE_OPTIMIZED_SIZE, bases.length); + final byte[] sAltBases = Arrays.copyOfRange(bases, offset, trimTo); + final byte[] sRefBases = Arrays.copyOf(sAltBases, sAltBases.length); + + // verify that we are correctly positioned + if ( sRefBases[refModOfs] != fr.readBases[0] ) { + logger.warn("sRefBases[refModOfs] != fr.readBases[0] : " + sRefBases[refModOfs] + " != " + fr.readBases[0]); + } + + // restore reference in ref haplotype + System.arraycopy(fr.refBases, 0, sRefBases, refModOfs, fr.refBases.length); + + // construct haplotypes + final SimpleInterval genomeLoc = new SimpleInterval(fr.read.getContig(), refStart, refStart + sAltBases.length - 1); + final Cigar cigar = new Cigar(); + cigar.add(new CigarElement(sAltBases.length, CigarOperator.M)); + final Haplotype altHaplotype = new Haplotype(sAltBases, false); + final Haplotype refHaplotype = new Haplotype(sRefBases, true); + altHaplotype.setGenomeLocation(genomeLoc); + refHaplotype.setGenomeLocation(genomeLoc); + altHaplotype.setCigar(cigar); + refHaplotype.setCigar(cigar); + + // prepare flow based haplotypes + rgInfo = FlowBasedReadUtils.getReadGroupInfo(getHeaderForReads(), fr.read); + haplotypes[0] = new FlowBasedHaplotype(altHaplotype, rgInfo.flowOrder); + haplotypes[1] = new FlowBasedHaplotype(refHaplotype, rgInfo.flowOrder); + + // restore changes + if ( altBase != 0 ) { + fr.refBases[0] = orgBase; + } + + // establish span + fromBaseIndex = offset; + toBaseIndex = trimTo - 1; + } + + private FlowBasedRead spanningRead(final GATKRead read) { + + // build read + final SAMRecord samRecord = new SAMRecord(getHeaderForReads()); + samRecord.setReadBases(Arrays.copyOfRange(read.getBasesNoCopy(), fromBaseIndex, toBaseIndex + 1)); + samRecord.setBaseQualities(Arrays.copyOfRange(read.getBaseQualitiesNoCopy(), fromBaseIndex, toBaseIndex + 1)); + samRecord.setAttribute("tp", Arrays.copyOfRange(read.getAttributeAsByteArray("tp"), fromBaseIndex, toBaseIndex + 1)); + if ( read.hasAttribute("t0") ) { + samRecord.setAttribute("t0", read.getAttributeAsString("t0").substring(fromBaseIndex, toBaseIndex + 1)); + } + samRecord.setAttribute("RG", read.getAttributeAsString("RG")); + samRecord.setCigarString("" + (toBaseIndex - fromBaseIndex + 1) + "M"); + final GATKRead gatkRead = new SAMRecordToGATKReadAdapter(samRecord); + + // build flow based read + final FlowBasedRead flowRead = new FlowBasedRead(gatkRead, rgInfo.flowOrder, rgInfo.maxClass, fbargs); + + return flowRead; + } + + public FlowBasedHaplotype[] spanningHaplotypes() { + return haplotypes; + } + } + // locals private VariantContextWriter vcfWriter; final private PriorityQueue featureQueue = new PriorityQueue<>(); @@ -276,12 +406,55 @@ public void onTraversalStart() { final SAMSequenceDictionary sequenceDictionary = getHeaderForReads().getSequenceDictionary(); vcfWriter = makeVCFWriter(outputVCF, sequenceDictionary, createOutputVariantIndex, createOutputVariantMD5, outputSitesOnlyVCFs); vcfWriter.writeHeader(makeVCFHeader(sequenceDictionary, getDefaultToolVCFHeaderLines())); + + // threaded writer? + if ( threadedWriter ) { + writerQueue = new LinkedBlockingQueue<>(CAPACITY1); + writerWorker = new Thread(new Runnable() { + @Override + public void run() { + WriterMessage message; + while ( true ) { + try { + message = writerQueue.take(); + if ( message.mappedFeature == null ) { + break; + } + emitFeature(message.mappedFeature); + } catch (InterruptedException e) { + throw new GATKException("", e); + } + } + } + }); + writerWorker.setUncaughtExceptionHandler(getUncaughtExceptionHandler()); + writerWorker.start();; + } + + // if using threads, set handler for this thread as well + if ( threadedWalker || threadedWriter ) { + Thread.currentThread().setUncaughtExceptionHandler(getUncaughtExceptionHandler()); + } + + // if using threaded walker extend reference cache to avoid thrashing + if ( threadedWalker ) { + CachingIndexedFastaSequenceFile.requestedCacheSize = CachingIndexedFastaSequenceFile.DEFAULT_CACHE_SIZE * CACHE_SIZE_FACTOR; + } + } @Override public void closeTool() { - flushQueue(null, null); super.closeTool(); + flushQueue(null, null); + if ( threadedWriter ) { + try { + writerQueue.put(new WriterMessage()); + writerWorker.join(); + } catch (InterruptedException e) { + throw new GATKException("", e); + } + } if ( vcfWriter != null ) { vcfWriter.close(); } @@ -376,23 +549,30 @@ private String scoreNameForBase(int baseIndex) { } @Override - public void apply(final GATKRead read, final ReferenceContext referenceContext, final FeatureContext featureContext) { + public boolean acceptRead(GATKRead read, ReferenceContext referenceContext, FeatureContext featureContext) { // include dups? if ( read.isDuplicate() && !fmArgs.includeDupReads ) { - return; + return false; } // include supplementary alignments? if ( read.isSupplementaryAlignment() && !fmArgs.keepSupplementaryAlignments ) { - return; + return false; } // include qc-failed reads? if ( ((read.getFlags() & VENDOR_QUALITY_CHECK_FLAG) != 0) && !includeQcFailedReads ) { - return; + return false; } + return true; + } + + + @Override + public void applyPossiblyThreaded(final GATKRead read, final ReferenceContext referenceContext, final FeatureContext featureContext) { + // flush qeues up to this read flushQueue(read, referenceContext); @@ -433,7 +613,17 @@ private void flushQueue(final GATKRead read, final ReferenceContext referenceCon while ( featureQueue.size() != 0 ) { final MappedFeature fr = featureQueue.poll(); enrichFeature(fr); - emitFeature(fr); + if ( !threadedWriter ) { + emitFeature(fr); + } else { + try { + WriterMessage message = new WriterMessage(); + message.mappedFeature = fr; + writerQueue.put(message); + } catch (InterruptedException e) { + throw new GATKException("", e); + } + } } } else { // enter read into the queue @@ -446,7 +636,17 @@ private void flushQueue(final GATKRead read, final ReferenceContext referenceCon || (fr.start < read.getStart()) ) { fr = featureQueue.poll(); enrichFeature(fr); - emitFeature(fr); + if ( !threadedWriter ) { + emitFeature(fr); + } else { + try { + WriterMessage message = new WriterMessage(); + message.mappedFeature = fr; + writerQueue.put(message); + } catch (InterruptedException e) { + throw new GATKException("", e); + } + } } else { break; @@ -486,19 +686,21 @@ private void enrichFeature(final MappedFeature fr) { private double scoreFeature(final MappedFeature fr) { return scoreFeature(fr, (byte)0); } + private double scoreFeature(final MappedFeature fr, byte altBase) { // build haplotypes - final FlowBasedReadUtils.ReadGroupInfo rgInfo = FlowBasedReadUtils.getReadGroupInfo(getHeaderForReads(), fr.read); - final FlowBasedHaplotype[] haplotypes = buildHaplotypes(fr, rgInfo.flowOrder, altBase); - - // create flow read - final FlowBasedRead flowRead = new FlowBasedRead(fr.read, rgInfo.flowOrder, rgInfo.maxClass, fbargs); - - final int diffLeft = haplotypes[0].getStart() - flowRead.getStart() + fr.offsetDelta; - final int diffRight = flowRead.getEnd() - haplotypes[0].getEnd(); - flowRead.applyBaseClipping(Math.max(0, diffLeft), Math.max(diffRight, 0), false); + final FeatureHaplotypes featureHaplotypes = new FeatureHaplotypes(fr, altBase); + final FlowBasedHaplotype[] haplotypes = featureHaplotypes.spanningHaplotypes(); + final FlowBasedRead flowRead = featureHaplotypes.spanningRead(fr.read); + // check lengths + if ( haplotypes[0].length() != haplotypes[1].length() ) { + logger.warn("haplotypes are different in length: " + haplotypes[0].length() + " != " + haplotypes[1].length()); + } + if ( flowRead.getLength() != haplotypes[0].length() ) { + logger.warn("flow read is different in length from haplotypes: " + flowRead.getLength() + " != " + haplotypes[0].length()); + } if ( !flowRead.isValid() ) { return -1; } @@ -527,19 +729,6 @@ private double scoreFeature(final MappedFeature fr, byte altBase) { logger.info("refrHapKey: " + haplotypes[1].getKeyLength() + " " + Arrays.toString(haplotypes[1].getKey())); computeLikelihoodLocal(flowRead, haplotypes[1], hapKeyLength, true); logger.info("score: " + score); - - // analyze read - final FlowBasedRead flowRead2 = new FlowBasedRead(fr.read, rgInfo.flowOrder, rgInfo.maxClass, fbargs); - final int[] key2 = flowRead2.getKey(); - for ( int i = 0 ; i < key2.length ; i++ ) { - final double p1 = flowRead2.getProb(i, key2[i]); - for ( int j = 0 ; j < rgInfo.maxClass ; j++ ) { - final double p2 = flowRead2.getProb(i, j); - if ( p2 > p1 ) - logger.info(String.format("prob at %s key[%d]=%d, %f is lower than at %d which is %f", - flowRead2.getName(), i, key2[i], p1, j, p2)); - } - } } if ( score < 0 && !fmArgs.keepNegatives && score != -1.0 ) { @@ -565,7 +754,7 @@ public static double computeLikelihoodLocal(final FlowBasedRead read, final Flow // debug support StringBuffer debugMessage = null; if ( debug ) - debugMessage = new StringBuffer(Integer.toString(startingPoint) + " hmer prob |"); + debugMessage = new StringBuffer(startingPoint + " hmer prob |"); double result = 0 ; for (int i = 0; i < read.getKeyLength(); i++) { int index = i + startingPoint; @@ -598,68 +787,6 @@ public static double computeLikelihoodLocal(final FlowBasedRead read, final Flow return result; } - private FlowBasedHaplotype[] buildHaplotypes(final MappedFeature fr, final String flowOrder, byte altBase) { - - // build bases for flow haplotypes - // NOTE!!!: this code assumes length of feature on read and reference is the same - // this is true for SNP but not for INDELs - it will have to be re-written! - // TODO: write for INDEL - byte[] bases = fr.read.getBasesNoCopy(); - int offset = fr.readBasesOffset; - int refStart = fr.start; - int refModOfs = 0; - - // install alt base? - byte orgBase = 0; - if ( altBase != 0 ) { - orgBase = fr.refBases[0]; - fr.refBases[0] = altBase; - } - - if ( offset > 0 ) { - // reach into hmer before - offset--; - refModOfs++; - refStart--; - - // extend until start of hmer - final byte hmerBase = bases[offset]; - while ( offset > 0 && bases[offset-1] == hmerBase ) { - offset--; - refModOfs++; - refStart--; - } - } - final byte[] sAltBases = Arrays.copyOfRange(bases, offset, bases.length); - final byte[] sRefBases = Arrays.copyOf(sAltBases, sAltBases.length); - System.arraycopy(fr.refBases, 0, sRefBases, refModOfs, fr.refBases.length); - - // construct haplotypes - final SimpleInterval genomeLoc = new SimpleInterval(fr.read.getContig(), refStart, refStart + sAltBases.length - 1); - final Cigar cigar = new Cigar(); - cigar.add(new CigarElement(sAltBases.length, CigarOperator.M)); - final Haplotype altHaplotype = new Haplotype(sAltBases, false); - final Haplotype refHaplotype = new Haplotype(sRefBases, true); - altHaplotype.setGenomeLocation(genomeLoc); - refHaplotype.setGenomeLocation(genomeLoc); - altHaplotype.setCigar(cigar); - refHaplotype.setCigar(cigar); - - // prepare flow based haplotypes - final FlowBasedHaplotype[] result = { - new FlowBasedHaplotype(altHaplotype, flowOrder), - new FlowBasedHaplotype(refHaplotype, flowOrder) - }; - - // restore changes - if ( altBase != 0 ) { - fr.refBases[0] = orgBase; - } - - // return - return result; - } - private boolean filterFeature(final MappedFeature fr) { if ( fmArgs.excludeNaNScores && Double.isNaN(fr.score) ) { @@ -703,7 +830,13 @@ private void emitFeature(final MappedFeature fr) { vcb.attribute(VCF_FC1, fr.nonIdentMBasesOnRead); vcb.attribute(VCF_FC2, fr.featuresOnRead); vcb.attribute(VCF_LENGTH, fr.read.getLength()); - vcb.attribute(VCF_EDIST, fr.refEditDistance); + if ( !fmArgs.levenshteinEditDistance ) { + int nmScore = SequenceUtil.calculateSamNmTag(fr.read.convertToSAMRecord(getHeaderForReads()), fr.referenceContext.getBases(new SimpleInterval(fr.read)), fr.read.getStart() - 1); + vcb.attribute(VCF_EDIST, nmScore); + } else if (fr.refEditDistance != null) { + int editDisance = fr.refEditDistance.get(); // this actually computes the distance + vcb.attribute(VCF_EDIST, editDisance); + } vcb.attribute(VCF_INDEX, fr.index); // median/mean quality on? diff --git a/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapperArgumentCollection.java b/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapperArgumentCollection.java index 03660746d2b..ac652bf48b6 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapperArgumentCollection.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapperArgumentCollection.java @@ -89,6 +89,12 @@ enum MappingFeatureEnum { @Argument(fullName = "keep-supplementary-alignments", doc = "keep supplementary alignments ?", optional = true) public boolean keepSupplementaryAlignments = false; + /** + * edit distance should come from computed NM + **/ + @Argument(fullName = "levenshtein-edit-distance", doc = "edit distance should come from a Levenshtein edit distance instead of NM ?", optional = true) + public boolean levenshteinEditDistance = false; + /** * debug negatives? **/ diff --git a/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/SNVMapper.java b/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/SNVMapper.java index e67f5ee570e..8da3ae82c79 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/SNVMapper.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/SNVMapper.java @@ -70,7 +70,6 @@ public void forEachOnRead(GATKRead read, ReferenceContext referenceContext, Cons } else { basesString = new String(Arrays.copyOfRange(bases, startSoftClip, bases.length - endSoftClip)); } - int refEditDistance = levDistance.apply(basesString, new String(ref)); // count bases delta on M cigar elements int nonIdentMBases = 0; @@ -140,11 +139,11 @@ public void forEachOnRead(GATKRead read, ReferenceContext referenceContext, Cons } // add this feature - FlowFeatureMapper.MappedFeature feature = FlowFeatureMapper.MappedFeature.makeSNV(read, readOfs, ref[refOfs], referenceContext.getStart() + refOfs, readOfs - refOfs); + FlowFeatureMapper.MappedFeature feature = FlowFeatureMapper.MappedFeature.makeSNV(read, readOfs, ref[refOfs], referenceContext.getStart() + refOfs, readOfs - refOfs, referenceContext); if ( (fmArgs.reportAllAlts || fmArgs.tagBasesWithAdjacentRefDiff) && !surrounded ) feature.adjacentRefDiff = true; feature.nonIdentMBasesOnRead = nonIdentMBases; - feature.refEditDistance = refEditDistance; + feature.refEditDistance = () -> levDistance.apply(basesString, new String(ref)); if ( !read.isReverseStrand() ) feature.index = readOfs; else diff --git a/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/ThreadedReadWalker.java b/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/ThreadedReadWalker.java new file mode 100644 index 00000000000..5758cdf1542 --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/ThreadedReadWalker.java @@ -0,0 +1,116 @@ +package org.broadinstitute.hellbender.tools.walkers.featuremapping; + +import org.broadinstitute.barclay.argparser.Argument; +import org.broadinstitute.hellbender.engine.FeatureContext; +import org.broadinstitute.hellbender.engine.ReadWalker; +import org.broadinstitute.hellbender.engine.ReferenceContext; +import org.broadinstitute.hellbender.exceptions.GATKException; +import org.broadinstitute.hellbender.utils.read.GATKRead; + +import java.util.concurrent.LinkedBlockingQueue; + +public abstract class ThreadedReadWalker extends ReadWalker implements Runnable { + + public static final int CAPACITY = 5000; + + // apply() message and queue - carrying read/referenceContext tuples + static class ApplyMessage { + GATKRead read; + ReferenceContext referenceContext; + FeatureContext featureContext; + } + + private LinkedBlockingQueue applyQueue; + private Thread worker; + + /** + * turn threaded walker on + **/ + @Argument(fullName = "threaded-walker", doc = "turn threaded walker on?", optional = true) + public boolean threadedWalker = false; + + + @Override + public void onTraversalStart() { + + // if running in threaded mode, start worker thread + if ( threadedWalker ) { + applyQueue = new LinkedBlockingQueue<>(CAPACITY); + worker = new Thread(this); + worker.setUncaughtExceptionHandler(getUncaughtExceptionHandler()); + worker.start(); + } + } + + @Override + public void closeTool() { + + // if running in threaded mode, signal end of input and wait for thread to complete + if ( threadedWalker ) { + try { + applyQueue.put(new ApplyMessage()); + worker.join(); + } catch (InterruptedException e) { + throw new GATKException("", e ); + } + } + } + + @Override + final public void apply(final GATKRead read, final ReferenceContext referenceContext, final FeatureContext featureContext) { + + if ( !acceptRead(read, referenceContext, featureContext) ) { + return; + } + + if ( threadedWalker ) { + // redirect message into queue + ApplyMessage message = new ApplyMessage(); + message.read = read; + message.referenceContext = referenceContext; + message.featureContext = featureContext; + try { + applyQueue.put(message); + } catch (InterruptedException e) { + throw new GATKException("", e); + } + } else { + applyPossiblyThreaded(read, referenceContext, featureContext); + } + } + + @Override + public void run() { + + // loop on messages + ApplyMessage message; + while ( true ) { + // take next message off the queue + try { + message = applyQueue.take(); + } catch (InterruptedException e) { + throw new GATKException("", e); + } + + // end of stream? + if ( message.read == null ) { + break; + } + + // process message + applyPossiblyThreaded(message.read, message.referenceContext, message.featureContext); + } + } + + public abstract boolean acceptRead(GATKRead read, ReferenceContext referenceContext, FeatureContext featureContext); + public abstract void applyPossiblyThreaded(final GATKRead read, final ReferenceContext referenceContext, final FeatureContext featureContext); + + public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { + Thread.UncaughtExceptionHandler handler = (th, ex) -> { + System.out.println("Uncaught exception: " + ex); + ex.printStackTrace(System.out); + System.exit(-1); + }; + return handler; + } +} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/fasta/CachingIndexedFastaSequenceFile.java b/src/main/java/org/broadinstitute/hellbender/utils/fasta/CachingIndexedFastaSequenceFile.java index 6cd2d678347..0d8d8f30e81 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/fasta/CachingIndexedFastaSequenceFile.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/fasta/CachingIndexedFastaSequenceFile.java @@ -49,6 +49,9 @@ public final class CachingIndexedFastaSequenceFile implements ReferenceSequenceF /** The default cache size in bp */ public static final long DEFAULT_CACHE_SIZE = 1000000; + /** Requested cache size in bp - will be respected if set */ + public static long requestedCacheSize = 0; + /** The cache size of this CachingIndexedFastaSequenceFile */ private final long cacheSize; @@ -236,9 +239,14 @@ public long getCacheMisses() { * @return the size of the cache we are using */ public long getCacheSize() { - return cacheSize; + return requestedCacheSize != 0 ? requestedCacheSize : cacheSize; + } + + private long getCacheMissBackup() { + return Math.max(getCacheSize() / 1000, 1); } + /** * Is this CachingIndexedFastaReader keeping the original case of bases in the fasta, or is * everything being made upper case? @@ -315,10 +323,10 @@ public ReferenceSequence getSequence(String contig) { * all of the bases in the ReferenceSequence returned by this method will be upper cased. */ @Override - public ReferenceSequence getSubsequenceAt( final String contig, long start, final long stop ) { + public synchronized ReferenceSequence getSubsequenceAt( final String contig, long start, final long stop ) { final ReferenceSequence result; - if ( (stop - start + 1) > cacheSize ) { + if ( (stop - start + 1) > getCacheSize() ) { cacheMisses++; result = sequenceFile.getSubsequenceAt(contig, start, stop); if ( ! preserveCase ) StringUtil.toUpperCase(result.getBases()); @@ -335,8 +343,8 @@ public ReferenceSequence getSubsequenceAt( final String contig, long start, fina if ( start < cache.start || stop > cache.stop || cache.seq == null || cache.seq.getContigIndex() != contigInfo.getSequenceIndex() ) { cacheMisses++; - cache.start = Math.max(start - cacheMissBackup, 0); - cache.stop = Math.min(start + cacheSize + cacheMissBackup, contigInfo.getSequenceLength()); + cache.start = Math.max(start - getCacheMissBackup(), 0); + cache.stop = Math.min(start + getCacheSize() + getCacheMissBackup(), contigInfo.getSequenceLength()); cache.seq = sequenceFile.getSubsequenceAt(contig, cache.start, cache.stop); // convert all of the bases in the sequence to upper case if we aren't preserving cases diff --git a/src/test/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapperIntegrationTest.java b/src/test/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapperIntegrationTest.java index 19d8cb8fc3b..5069603ff44 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapperIntegrationTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/walkers/featuremapping/FlowFeatureMapperIntegrationTest.java @@ -220,4 +220,105 @@ public void testTagBasesWithAdjacentRefDiff() throws IOException { } } + @Test + public void testLevenshteinEditDistance() throws IOException { + + final File outputDir = createTempDir("testFlowFeatureMapperTest"); + final File expectedFile = new File(testDir + "/snv_feature_mapper_nm_edit_disance_output.vcf"); + final File outputFile = UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ? expectedFile : new File(outputDir + "/snv_feature_mapper_nm_edit_disance_output.vcf"); + + final String[] args = new String[] { + "-R", largeFileTestDir + "/Homo_sapiens_assembly38.fasta.gz", + "-O", outputFile.getAbsolutePath(), + "-I", testDir + "/snv_feature_mapper_input.bam", + "--copy-attr", "RG", + "--copy-attr", "AS,Integer,AS attribute, as copied", + "--copy-attr", "rq,Float", + "--limit-score", "100", + "--min-score", "0", + "--snv-identical-bases", "10", + "--debug-negatives", "false", + "--debug-read-name", "150451-BC94-0645901755", + "--levenshtein-edit-distance" + }; + + // run the tool + runCommandLine(args); // no assert, just make sure we don't throw + + // make sure we've generated the otuput file + Assert.assertTrue(outputFile.exists()); + + // walk the output and expected files, compare non-comment lines + if ( !UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ) { + IntegrationTestSpec.assertEqualTextFiles(outputFile, expectedFile, "#"); + } + } + + @Test + public void testMapperThread() throws IOException { + + final File outputDir = createTempDir("testFlowFeatureMapperTest"); + final File expectedFile = new File(testDir + "/snv_feature_mapper_output.vcf"); + final File outputFile = UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ? expectedFile : new File(outputDir + "/snv_feature_mapper_output.vcf"); + + final String[] args = new String[] { + "-R", largeFileTestDir + "/Homo_sapiens_assembly38.fasta.gz", + "-O", outputFile.getAbsolutePath(), + "-I", testDir + "/snv_feature_mapper_input.cram", + "--copy-attr", "RG", + "--copy-attr", "AS,Integer,AS attribute, as copied", + "--copy-attr", "rq,Float", + "--limit-score", "100", + "--min-score", "0", + "--snv-identical-bases", "10", + "--debug-negatives", "false", + "--debug-read-name", "150451-BC94-0645901755", + "--threaded-walker" + }; + + // run the tool + runCommandLine(args); // no assert, just make sure we don't throw + + // make sure we've generated the otuput file + Assert.assertTrue(outputFile.exists()); + + // walk the output and expected files, compare non-comment lines + if ( !UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ) { + IntegrationTestSpec.assertEqualTextFiles(outputFile, expectedFile, "#"); + } + } + + @Test + public void testMapperWriterThreaded() throws IOException { + + final File outputDir = createTempDir("testFlowFeatureMapperTest"); + final File expectedFile = new File(testDir + "/snv_feature_mapper_output.vcf"); + final File outputFile = UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ? expectedFile : new File(outputDir + "/snv_feature_mapper_output.vcf"); + + final String[] args = new String[] { + "-R", largeFileTestDir + "/Homo_sapiens_assembly38.fasta.gz", + "-O", outputFile.getAbsolutePath(), + "-I", testDir + "/snv_feature_mapper_input.cram", + "--copy-attr", "RG", + "--copy-attr", "AS,Integer,AS attribute, as copied", + "--copy-attr", "rq,Float", + "--limit-score", "100", + "--min-score", "0", + "--snv-identical-bases", "10", + "--debug-negatives", "false", + "--debug-read-name", "150451-BC94-0645901755", + "--threaded-writer" + }; + + // run the tool + runCommandLine(args); // no assert, just make sure we don't throw + + // make sure we've generated the otuput file + Assert.assertTrue(outputFile.exists()); + + // walk the output and expected files, compare non-comment lines + if ( !UPDATE_EXACT_MATCH_EXPECTED_OUTPUTS ) { + IntegrationTestSpec.assertEqualTextFiles(outputFile, expectedFile, "#"); + } + } } diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_input.cram b/src/test/resources/large/featureMapping/snv_feature_mapper_input.cram new file mode 100644 index 00000000000..fcc9a60c592 --- /dev/null +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_input.cram @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:44edf4cfe7bf4c3897a2626dcc69f5a16c0a95b74ef101fbfc33e42204b723f9 +size 17140176 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_input.cram.crai b/src/test/resources/large/featureMapping/snv_feature_mapper_input.cram.crai new file mode 100644 index 00000000000..df440c58981 --- /dev/null +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_input.cram.crai @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:3fcaf3c2e53ad43c9db324f1573f94ee59cf52b8f2c05246026e17ede9e477ae +size 453 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_nm_edit_disance_output.vcf b/src/test/resources/large/featureMapping/snv_feature_mapper_nm_edit_disance_output.vcf new file mode 100644 index 00000000000..031b5335d0b --- /dev/null +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_nm_edit_disance_output.vcf @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9ff5150c391f7b3fcf254d9efa2fc0e63137b693900ebfe69b655e7ce9c3f88b +size 5719854 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_nm_edit_disance_output.vcf.idx b/src/test/resources/large/featureMapping/snv_feature_mapper_nm_edit_disance_output.vcf.idx new file mode 100644 index 00000000000..7929b21fada --- /dev/null +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_nm_edit_disance_output.vcf.idx @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:f4191ea87a60ca0f903100a3a4c2cd1c58be37ec3880c78ae402168f0b1f9006 +size 120129 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_output.vcf b/src/test/resources/large/featureMapping/snv_feature_mapper_output.vcf index 15420a30c69..39f57153f94 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_output.vcf +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_output.vcf @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:fec49990080a8d7a21c3d5639c2bcba83a67c5a88d48bc58127fd6c51c10987a -size 5727362 +oid sha256:ce8d9bc5cc31c7e249ba2fc4dc1924e190ddd59b69c0b4654d432f60d7e5846d +size 5719919 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_output.vcf.idx b/src/test/resources/large/featureMapping/snv_feature_mapper_output.vcf.idx index 51f9292562f..cc9843cba62 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_output.vcf.idx +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_output.vcf.idx @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:2048ce82810a5752d6fb1de9e6e9c2166bc0cf0981cc63a82f4fd06d4bf81275 +oid sha256:7b0213572039ac7271af11165c3ec9144434bd532f5e17475f36cd0a027ff7a3 size 120113 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_qc_failed_output.vcf b/src/test/resources/large/featureMapping/snv_feature_mapper_qc_failed_output.vcf index 19530e544e0..9946c047c32 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_qc_failed_output.vcf +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_qc_failed_output.vcf @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:5868392f6e82af04e84a960f99f958f3f30e2476af3997bf5ad9db70a727961f -size 5057024 +oid sha256:6e7091e446242545c12837f5d6bd03fca745f4d84f74783febe766a2e22eb2f3 +size 5049581 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_qc_failed_output.vcf.idx b/src/test/resources/large/featureMapping/snv_feature_mapper_qc_failed_output.vcf.idx index f19964be546..9064273ffbc 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_qc_failed_output.vcf.idx +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_qc_failed_output.vcf.idx @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:94675097a7732fb42d58bbb4c4beed366a5a92b68b106b5f827d9d7d1d1f479e +oid sha256:b90008bf01b3a7a7808594ec58a52e29a9349464e0aec4ec8b9d722c2c3dd032 size 120123 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_report_all_alts_output.vcf b/src/test/resources/large/featureMapping/snv_feature_mapper_report_all_alts_output.vcf index 438ff402e44..6b8901ed38a 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_report_all_alts_output.vcf +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_report_all_alts_output.vcf @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:2bb077a1ed1632cf48ea3ee710c1d3517b86c679070e07880e8fffe0ed0acafb -size 22013893 +oid sha256:defc64361ef51bb0ed4d425f937301e006c3beb6b2321ba04d8f6b690026e231 +size 21933008 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_report_all_alts_output.vcf.idx b/src/test/resources/large/featureMapping/snv_feature_mapper_report_all_alts_output.vcf.idx index 3306703bc4e..920da615f5e 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_report_all_alts_output.vcf.idx +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_report_all_alts_output.vcf.idx @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:aa90be0835317f993e082e9543aa17e83b8f0ba29c2faa18314de0017356eb23 +oid sha256:323ee6c3bdf8170020a38749e478fcef1a647746e69b611a3c283d7c2066aa96 size 134797 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_smq_all_output.vcf b/src/test/resources/large/featureMapping/snv_feature_mapper_smq_all_output.vcf index 075cdd31993..def604f6006 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_smq_all_output.vcf +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_smq_all_output.vcf @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:56033f5f98ef9999741c7f8a0c140045bd42558194a4512f3c4742f8a24e4aab -size 6760184 +oid sha256:7d9408f46cdf74f1195584223503c92aa9be427b749a4323c5213df216d0e564 +size 6752741 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_smq_all_output.vcf.idx b/src/test/resources/large/featureMapping/snv_feature_mapper_smq_all_output.vcf.idx index c29140ecdb9..11ba43d408a 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_smq_all_output.vcf.idx +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_smq_all_output.vcf.idx @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:0df79aee9e4be92defe36b7d40f21615a6883c32f99e7df35cbb8c407639d363 +oid sha256:7c74a6d7ff041b57bcb16221a70eec74e0133385c715e6559cff352bf91eb0c5 size 120121 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_smq_output.vcf b/src/test/resources/large/featureMapping/snv_feature_mapper_smq_output.vcf index 32a060c9c5e..1578a2aa688 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_smq_output.vcf +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_smq_output.vcf @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:a38048e71ea5b01195bf52b38d07cef3f38e91e80b76f2737165e2e84e78bf1e -size 5783362 +oid sha256:e826d1c8519ac409a42637926e6df7d14d3ed9286fd7d798b1ce2a49ceb489b7 +size 5775919 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_smq_output.vcf.idx b/src/test/resources/large/featureMapping/snv_feature_mapper_smq_output.vcf.idx index 340b9475475..eb7a5cfdb2e 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_smq_output.vcf.idx +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_smq_output.vcf.idx @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:6f5755a11e3577ed97ed8ccddded2135a0de1b5bbbdd247087bd355a5f4c6770 +oid sha256:c0f2e687d5dc63d57c5a700989935bda01e53a39eb744e90c94826a37bea6e72 size 120117 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_tag_adjs_output.vcf b/src/test/resources/large/featureMapping/snv_feature_mapper_tag_adjs_output.vcf index ae8d2a47b9b..9fd2aa2720f 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_tag_adjs_output.vcf +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_tag_adjs_output.vcf @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:511e7b3faded9b68fddb05098ad681965c8ac3d69a7c0879e1e404e0d7ac0992 -size 250675 +oid sha256:75e86a411dddb92983bc67c46c92ca12b3b77bdf71d7289894de7f439a29c1a1 +size 250744 diff --git a/src/test/resources/large/featureMapping/snv_feature_mapper_tag_adjs_output.vcf.idx b/src/test/resources/large/featureMapping/snv_feature_mapper_tag_adjs_output.vcf.idx index 888d93a2b15..efa61f5cc2a 100644 --- a/src/test/resources/large/featureMapping/snv_feature_mapper_tag_adjs_output.vcf.idx +++ b/src/test/resources/large/featureMapping/snv_feature_mapper_tag_adjs_output.vcf.idx @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:d0e3cb83a487006e075466405dc48dc2e4f5eaa222597361ac77f2cb0d2648cf +oid sha256:4aacba3a23b834b689144a57408eed0620921f678f1d1ae1a71cf1fbd95c268f size 118520 diff --git a/src/test/resources/large/flowBasedAlignment/input_jukebox_for_test.expected.alm b/src/test/resources/large/flowBasedAlignment/input_jukebox_for_test.expected.alm index 693cf6cc895..7be9a9b5c4f 100644 --- a/src/test/resources/large/flowBasedAlignment/input_jukebox_for_test.expected.alm +++ b/src/test/resources/large/flowBasedAlignment/input_jukebox_for_test.expected.alm @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:06c11b05b97e89f99ebe83376e166eaca438f0d748f03da8512e26ca559be6c2 -size 24335517 +oid sha256:8e7071df04b55b1d98e86edd6cf0c5880c630b65a07fd904a0f422e9436848cc +size 24333709