Skip to content

Commit

Permalink
Fix ReservoirDownsampler, PositionalDownsampler, and ReadsDownsamplin…
Browse files Browse the repository at this point in the history
…gIterator.
  • Loading branch information
cmnbroad committed Jan 23, 2019
1 parent c120323 commit 222332c
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private void finalizePendingReads() {
// the desired coverage, but if the region is decently mappable the shortfall will be minor.
final ReservoirDownsampler wellMappedDownsampler = new ReservoirDownsampler(maxCoverage, false);
pendingReads.stream().filter(read -> read.getMappingQuality() > SUSPICIOUS_MAPPING_QUALITY).forEach(wellMappedDownsampler::submit);
wellMappedDownsampler.signalEndOfInput();
final List<GATKRead> readsToFinalize = wellMappedDownsampler.consumeFinalizedItems();
if (stride > 1) {
Collections.sort(readsToFinalize, Comparator.comparingInt(GATKRead::getAssignedStart));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package org.broadinstitute.hellbender.utils.downsampling;

import htsjdk.samtools.SAMFileHeader;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.ReadCoordinateComparator;
import org.broadinstitute.hellbender.utils.read.ReadUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;


/**
Expand Down Expand Up @@ -71,16 +73,31 @@ private void handlePositionalChange( final GATKRead newRead ) {
// Use ReadCoordinateComparator to determine whether we've moved to a new start position.
// ReadCoordinateComparator will correctly distinguish between purely unmapped reads and unmapped reads that
// are assigned a nominal position.
if ( previousRead != null && ReadCoordinateComparator.compareCoordinates(previousRead, newRead, header) != 0 ) {
if ( reservoir.hasFinalizedItems() ) {
finalizeReservoir();
if ( previousRead != null) {
final int cmpDiff = ReadCoordinateComparator.compareCoordinates(previousRead, newRead, header);
if (cmpDiff == 1) {
throw new GATKException.ShouldNeverReachHereException(
String.format("Reads must be coordinate sorted (earlier %s later %s)", previousRead, newRead));
}
if (cmpDiff != 0) {
reservoir.signalEndOfInput();
if ( reservoir.hasFinalizedItems() ) {
finalizeReservoir();
}
}
}
}

private void finalizeReservoir() {
// We can't consume finalized reads from the reservoir unless we first signal EOI.
// Once signalEndOfInput has been called and propagated to the ReservoirDownsampler, consumeFinalizedItems
// must be called on the ReservoirDownsampler before any new items can be submitted to it, to reset it's
// state so it can be recycled/reused for the next downsampling position.
reservoir.signalEndOfInput();
finalizedReads.addAll(reservoir.consumeFinalizedItems());
reservoir.clearItems();
reservoir.resetStats();
previousRead = null;
}

@Override
Expand All @@ -97,9 +114,11 @@ public List<GATKRead> consumeFinalizedItems() {

@Override
public boolean hasPendingItems() {
// The finalized items in the ReservoirDownsampler are pending items from the perspective of the
// enclosing PositionalDownsampler
return reservoir.hasFinalizedItems();
// The ReservoirDownsampler accumulates pending items until signalEndOfInput has been called, at which
// point all items that have survived downsampling become finalized. From the perspective of the enclosing
// PositionalDownsampler, both finalized items and pending items in the ReservoirDownsampler are considered
// pending.
return reservoir.hasFinalizedItems() || reservoir.hasPendingItems();
}

@Override
Expand All @@ -109,9 +128,11 @@ public GATKRead peekFinalized() {

@Override
public GATKRead peekPending() {
// The finalized items in the ReservoirDownsampler are pending items from the perspective of the
// enclosing PositionalDownsampler
return reservoir.peekFinalized();
// The ReservoirDownsampler accumulates pending items until signalEndOfInput has been called, at which
// point all items that have survived downsampling become finalized. From the perspective of the enclosing
// PositionalDownsampler, both finalized items and pending items in the ReservoirDownsampler are considered
// pending.
return Optional.ofNullable(reservoir.peekFinalized()).orElse(reservoir.peekPending());
}

@Override
Expand Down Expand Up @@ -139,6 +160,7 @@ public boolean requiresCoordinateSortOrder() {

@Override
public void signalNoMoreReadsBefore( final GATKRead read ) {
Utils.nonNull(read, "Positional downsampler requires non-null reads");
handlePositionalChange(read);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public final class ReservoirDownsampler extends ReadsDownsampler {
*/
private int totalReadsSeen;

/**
* In order to guarantee that all reads have equal probability of being discarded, we need to have consumed the
* entire input stream before any items can become finalized. All submitted items (that survive downsampling)
* remain pending until endOfInputStream is called, at which point they become finalized.
*/
private boolean endOfInputStream;


/**
* Construct a ReservoirDownsampler
Expand Down Expand Up @@ -88,6 +95,9 @@ public ReservoirDownsampler(final int targetSampleSize ) {
@Override
public void submit ( final GATKRead newRead ) {
Utils.nonNull(newRead, "newRead");
// Once the end of the input stream has been seen, consumeFinalizedItems must be called to reset the state
// of the ReservoirDownsampler before more items can be submitted
Utils.validate(endOfInputStream == false, "attempt to submit read after end of input stream has been seen");

// Only count reads that are actually eligible for discarding for the purposes of the reservoir downsampling algorithm
totalReadsSeen++;
Expand All @@ -110,35 +120,39 @@ public void submit ( final GATKRead newRead ) {

@Override
public boolean hasFinalizedItems() {
return ! reservoir.isEmpty();
// All items in the reservoir are pending until endOfInputStream is seen, at which point all items become finalized
return endOfInputStream && !reservoir.isEmpty();
}

@Override
public List<GATKRead> consumeFinalizedItems() {
Utils.validate(endOfInputStream == true, "signalEndOfInput must be called before finalized items can be consumed");
if (hasFinalizedItems()) {
// pass reservoir by reference rather than make a copy, for speed
final List<GATKRead> downsampledItems = reservoir;
clearItems();
return downsampledItems;
} else {
// if there's nothing here, don't bother allocating a new list
clearItems();
return Collections.emptyList();
}
}

@Override
public boolean hasPendingItems() {
return false;
// All items in the reservoir are pending until endOfInputStream is seen, at which point all items become finalized
return !endOfInputStream && !reservoir.isEmpty();
}

@Override
public GATKRead peekFinalized() {
return reservoir.isEmpty() ? null : reservoir.get(0);
return hasFinalizedItems() ? reservoir.get(0) : null;
}

@Override
public GATKRead peekPending() {
return null;
return hasPendingItems() ? reservoir.get(0) : null;
}

@Override
Expand All @@ -148,7 +162,7 @@ public int size() {

@Override
public void signalEndOfInput() {
// NO-OP
endOfInputStream = true;
}

/**
Expand All @@ -164,6 +178,8 @@ public void clearItems() {

// an internal stat used by the downsampling process, so not cleared by resetStats() below
totalReadsSeen = 0;

endOfInputStream = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testVCFModeIsConcordantWithGATK3_8Results() throws Exception {
* Test that in VCF mode we're >= 99% concordant with GATK3.8 results
* THIS TEST explodes with an exception because Allele-Specific annotations are not supported in vcf mode yet.
* It's included to parallel the matching (also exploding) test for the non-spark HaplotypeCaller
* {@link org.broadinstitute.hellbender.tools.walkers.haplotypecaller.HaplotypeCallerIntegrationTest#testVCFModeIsConcordantWithGATK3_8ResultsAlleleSpecificAnnotations()}
* {@link org.broadinstitute.hellbender.tools.walkers.haplotypecaller.HaplotypeCallerIntegrationTest#testVCFModeIsConcordantWithGATK3_8ResultsAlleleSpecificAnnotations(String, String)}}
*/
@Test(expectedExceptions = UserException.class)
public void testVCFModeIsConcordantWithGATK3_8ResultsAlleleSpecificAnnotations() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.testng.annotations.Test;

import java.util.*;
import java.util.stream.IntStream;

public class ReadsDownsamplingIteratorUnitTest extends GATKBaseTest {

Expand Down Expand Up @@ -132,6 +133,22 @@ public void testRemoveThrows() {
iter.remove();
}

@Test
public void testReadsDownsamplingIteratorWithReservoirDownsampler() {
final int TOTAL_READ_COUNT = 100;
final int TARGET_COVERAGE = 45;
final List<GATKRead> reads = new ArrayList<>(TOTAL_READ_COUNT);
IntStream.range(1, TOTAL_READ_COUNT).forEach(i -> reads.add(readWithName(Integer.toString(i))));

final List<GATKRead> downsampledReads = new ArrayList<>();
final ReadsDownsamplingIterator downsamplingIter = new ReadsDownsamplingIterator(reads.iterator(), new ReservoirDownsampler(TARGET_COVERAGE));
for ( final GATKRead read : downsamplingIter ) {
downsampledReads.add(read);
}

Assert.assertEquals(downsampledReads.size(), TARGET_COVERAGE);
}

private GATKRead readWithName( final String name ) {
return ArtificialReadUtils.createArtificialRead(TextCigarCodec.decode("10M"), name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,26 @@ public void testReservoirDownsampler(final ReservoirDownsamplerTest test ) {

downsampler.submit(test.createReads());

// after submit, but before signalEndOfInput, all reads are pending, none are finalized
if ( test.totalReads > 0 ) {
Assert.assertTrue(downsampler.hasFinalizedItems());
Assert.assertTrue(downsampler.peekFinalized() != null);
Assert.assertFalse(downsampler.hasPendingItems());
Assert.assertTrue(downsampler.peekPending() == null);
Assert.assertFalse(downsampler.hasFinalizedItems());
Assert.assertNull(downsampler.peekFinalized());
Assert.assertTrue(downsampler.hasPendingItems());
Assert.assertNotNull(downsampler.peekPending());
}
else {
Assert.assertFalse(downsampler.hasFinalizedItems() || downsampler.hasPendingItems());
Assert.assertTrue(downsampler.peekFinalized() == null && downsampler.peekPending() == null);
}

// after signalEndOfInput, not reads are pending, all are finalized
downsampler.signalEndOfInput();

if ( test.totalReads > 0 ) {
Assert.assertTrue(downsampler.hasFinalizedItems());
Assert.assertTrue(downsampler.peekFinalized() != null);
Assert.assertNotNull(downsampler.peekFinalized());
Assert.assertFalse(downsampler.hasPendingItems());
Assert.assertTrue(downsampler.peekPending() == null);
Assert.assertNull(downsampler.peekPending());
}
else {
Assert.assertFalse(downsampler.hasFinalizedItems() || downsampler.hasPendingItems());
Expand Down

0 comments on commit 222332c

Please sign in to comment.