Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ReservoirDownsampler, PositionalDownsampler, and ReadsDownsamplingIterator. #5594

Merged
merged 10 commits into from
Jan 29, 2019

Conversation

cmnbroad
Copy link
Collaborator

Fixes #4768.

The ReservoirDownsampler currently declares reads to be finalized immediately after they're submitted, but in order to guaranty that every read has equal probability of being discarded, it should consume the entire stream of input items before declaring any item to be finalized. When used with a ReadsDownsamplingIterator, no reads are ever downsampled because the iterator populates it's internal cache by eagerly consuming finalized reads as soon as they become available.

Also, PositionalDownsampler doesn't reset it's internal ReservoirDownsampler's state correctly.

@droazen droazen self-requested a review January 22, 2019 20:28
@droazen droazen self-assigned this Jan 22, 2019
@codecov-io
Copy link

codecov-io commented Jan 22, 2019

Codecov Report

Merging #5594 into master will decrease coverage by 80.096%.
The diff coverage is 0%.

@@               Coverage Diff               @@
##              master    #5594        +/-   ##
===============================================
- Coverage     87.046%   6.951%   -80.096%     
+ Complexity     31524     2797     -28727     
===============================================
  Files           1928     1934         +6     
  Lines         145340   145727       +387     
  Branches       16089    16104        +15     
===============================================
- Hits          126513    10129    -116384     
- Misses         12966   134867    +121901     
+ Partials        5861      731      -5130
Impacted Files Coverage Δ Complexity Δ
...der/tools/HaplotypeCallerSparkIntegrationTest.java 2.439% <ø> (-65.854%) 2 <0> (-14)
...ender/utils/downsampling/ReservoirDownsampler.java 0% <0%> (-100%) 0 <0> (-21)
...llbender/utils/downsampling/MutectDownsampler.java 0% <0%> (-68.182%) 0 <0> (-17)
...ownsampling/ReadsDownsamplingIteratorUnitTest.java 1.639% <0%> (-64.361%) 1 <0> (-7)
...ils/downsampling/ReservoirDownsamplerUnitTest.java 1.449% <0%> (-75.6%) 1 <0> (-9)
...nder/utils/downsampling/PositionalDownsampler.java 0% <0%> (-100%) 0 <0> (-21)
...ls/variant/writers/GVCFBlockCombiningIterator.java 0% <0%> (-100%) 0% <0%> (-1%)
...ark/AssemblyRegionReadShardArgumentCollection.java 0% <0%> (-100%) 0% <0%> (-1%)
...nder/tools/copynumber/utils/TagGermlineEvents.java 0% <0%> (-100%) 0% <0%> (-3%)
...r/tools/spark/pathseq/PSBwaArgumentCollection.java 0% <0%> (-100%) 0% <0%> (-1%)
... and 1772 more

@cmnbroad cmnbroad force-pushed the cn_downsampling_iterator branch 2 times, most recently from e9cb24e to 0d6c241 Compare January 22, 2019 23:43
@ldgauthier
Copy link
Contributor

I'm surprised none of the variant calling integration tests change. @cmnbroad Would you expect this to change the behavior in any common use cases or is this more of a safety check?

It's admittedly also possible that the MT calling in the Mutect2 integration test does change slightly, but that's a very lenient concordance check.

@cmnbroad
Copy link
Collaborator Author

cmnbroad commented Jan 23, 2019

@ldgauthier Yeah, I was surprised too. The problematic pattern is interleaving submit and has/consumeFinalizedItems calls on a ReservoirDownsampler. ReadsDownsamplingIterator uses that pattern that, but I don't see any existing code that uses it with a ReservoirDownsampler.

Interestingly, PositionalDownsampler, which uses a ReservoirDownsampler, masks the reservoir problem even when its used with `ReadsDownsamplingIterator by providing its own positional-based finalization semantics, and never consuming finalized items from the reservoir until it sees a position change.

The Mutect code also doesn't interleave calls, so the change there is just to prevent triggering of the new checks I added. AssemblyRegionWalker does compose Downsamplers with a ReadsDownsamplingIterator, but AFACIT not directly with a ReservoirDownsampler.

@droazen
Copy link
Collaborator

droazen commented Jan 23, 2019

Not surprised that tests didn't change -- we only use the PositionalDownsampler inside a downsampling iterator for HC/M2, never a ReservoirDownsampler directly. I believe that the CNN tool is the first case where we wanted to use a ReservoirDownsampler directly inside an iterator, and ran into this bug.

Copy link
Collaborator

@droazen droazen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmnbroad Back to you with my comments

finalizedReads.addAll(reservoir.consumeFinalizedItems());
reservoir.clearItems();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call above to reservoir.consumeFinalizedItems() calls clearItems() internally, so you don't need to call it again here. Each clearItems() call results in an extra array allocation, so we want to minimize calls to this method, since downsampling gets performed in some performance-critical sections of code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

@@ -94,6 +94,7 @@ private void finalizePendingReads() {
// the desired coverage, but if the region is decently mappable the shortfall will be minor.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SamplePartitioner also uses a ReservoirDownsampler directly -- did you check the usage there to ensure it's consistent with the new semantics?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, and just did so again to make sure- it looks like it follows the protocol pretty well and reliably calls signalEndOfInput before consuming items.

}
}
}

private void finalizeReservoir() {
// We can't consume finalized reads from the reservoir unless we first signal EOI.
// Once signalEndOfInput has been called and propagated to the ReservoirDownsampler, consumeFinalizedItems
// must be called on the ReservoirDownsampler before any new items can be submitted to it, to reset it's
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's -> its

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if ( previousRead != null) {
final int cmpDiff = ReadCoordinateComparator.compareCoordinates(previousRead, newRead, header);
if (cmpDiff == 1) {
throw new GATKException.ShouldNeverReachHereException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be a ShouldNeverReachHereException -- we can easily reach here if the client provides reads in the wrong order. Make it just a regular GATKException or an IllegalStateException

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. BTW, I originally added this check because the HaplotypeCallerSpark tests were getting into this state after I made my changes, but before I added the code that resets previousRead to null in finalizeReservoir (see my response to your comment about that below).

throw new GATKException.ShouldNeverReachHereException(
String.format("Reads must be coordinate sorted (earlier %s later %s)", previousRead, newRead));
}
if (cmpDiff != 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if -> else if

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -88,6 +95,9 @@ public ReservoirDownsampler(final int targetSampleSize ) {
@Override
public void submit ( final GATKRead newRead ) {
Utils.nonNull(newRead, "newRead");
// Once the end of the input stream has been seen, consumeFinalizedItems must be called to reset the state
// of the ReservoirDownsampler before more items can be submitted
Utils.validate(endOfInputStream == false, "attempt to submit read after end of input stream has been seen");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endOfInputStream == false -> ! endOfInputStream

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has been seen -> has been signaled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

@Override
public List<GATKRead> consumeFinalizedItems() {
Utils.validate(endOfInputStream == true, "signalEndOfInput must be called before finalized items can be consumed");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If consumeFinalizedItems() is called when there are no finalized items, it should return an empty list, not throw an exception.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting that this validate call be removed ? It just validates that signalEndOfInput has been called, which is a precondition for having finalized items. To me, hitting this exception indicates a contract violation - the caller shouldn't expect there to be finalized items until eoi is called. This caught the case in MutectDownsampler where it was calling submit followed by consume with no intervening signalEndOfInput.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it should be removed. The contract of consumeFinalizedItems() is that it should return an empty list if there are no finalized items (without any side effects such as resetting the downsampler) -- this check violates that contract by throwing an exception instead.

if (hasFinalizedItems()) {
// pass reservoir by reference rather than make a copy, for speed
final List<GATKRead> downsampledItems = reservoir;
clearItems();
return downsampledItems;
} else {
// if there's nothing here, don't bother allocating a new list
clearItems();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above -- calling consumeFinalizedItems() when there are no finalized items should return an empty list, and not change the internal state of the downsampler (eg., by calling clearItems()).

Copy link
Collaborator Author

@cmnbroad cmnbroad Jan 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if-branch is the case where end of input has been signaled, but no items were ever submitted. Removing the clearItems call would require the caller to detect when consumeFinalizedItems returns 0 items, and then also call clearItems(), in order to reset the state to reuse the downsampler. Doesn't that unnecessarily complicate the contract ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point we should consider adding a method to the Downsampler interface called something like resetState to make the contract on how to recycle these more explicit.

Copy link
Collaborator

@droazen droazen Jan 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most downsamplers don't get reused after signalEndOfInput() -- the ReservoirDownsampler within a downsampling iterator is an exception, so we shouldn't over-optimize for that case. It also doesn't make sense to me for consumeFinalizedItems() to have side effects like resetting the downsampler if there are no finalized items. Returning an empty list without any side effects seems much more logical.

}

Assert.assertEquals(downsampledReads.size(), TARGET_COVERAGE);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I suggest that you also modify the existing test ReservoirDownsamplerUnitTest.testReservoirDownsampler() to check, at the end, that the results from running with a ReservoirDownsampler wrapped in a ReadsDownsamplingIterator match the results from running standalone?

PositionalDownsamplerUnitTest.testPositionalDownsampler() has a check like this at the end, to illustrate what I mean:

        // Now test with a PositionalDownsampler wrapped in an iterator, and make sure we get the same results.
        // It's crucial to reset the random number generator again in order to match the selections made by the
        // first downsampling pass.
        Utils.resetRandomGenerator();
        final ReadsDownsamplingIterator downsamplingIter = new ReadsDownsamplingIterator(allReads.iterator(), new PositionalDownsampler(targetCoverage, header));
        final List<GATKRead> downsampledReadsFromIter = new ArrayList<>();
        for ( final GATKRead downsampledRead : downsamplingIter ) {
            downsampledReadsFromIter.add(downsampledRead);
        }

        Assert.assertEquals(downsampledReadsFromIter, downsampledReads, "Results from PositionalDownsampler wrapped in a ReadsDownsamplingIterator do not match results from standalone PositionalDownsampler");

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good idea. Done.

}
else {
Assert.assertFalse(downsampler.hasFinalizedItems() || downsampler.hasPendingItems());
Assert.assertTrue(downsampler.peekFinalized() == null && downsampler.peekPending() == null);
}

// after signalEndOfInput, not reads are pending, all are finalized
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not -> no

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@droazen droazen assigned cmnbroad and unassigned droazen Jan 24, 2019
@cmnbroad
Copy link
Collaborator Author

back to @droazen.

@droazen
Copy link
Collaborator

droazen commented Jan 25, 2019

@cmnbroad Back to you -- ReservoirDownsampler.consumeFinalizedItems() still needs to be updated to return an empty list with no side effects when there are no finalized items.

@droazen droazen removed their assignment Jan 25, 2019
@cmnbroad
Copy link
Collaborator Author

Back to @droazen. Also in the last round, I didn't actually add the code in PositionalDownSampler to reject submit after signalEndOfInput was called (I added the state to keep track of it, but left out the actual validate call in submit). Letting tests run then, back to you.

@cmnbroad
Copy link
Collaborator Author

Well, when the clearItems call is removed from the consumeFinalizeItems else branch, some HaplotypeCallerSparkIntegrationTests fail because PushToPullIterator doesn't call clearItems to reset the state when consumeFinalizeItems returns no items, and the next submit is rejected because eoi hasn't been reset. So, since consumeFinalizeItems can't reset/mutate the state when there are no items, then we can't assert the precondition that endOfInput==false in submit. So I'm removing the validation of that from both ReservoirDownsampler and PositionalDownsampler.

@droazen
Copy link
Collaborator

droazen commented Jan 29, 2019

@cmnbroad I've implemented a compromise approach in ReservoirDownsampler.consumeFinalizedItems() that I think satisfies both of our concerns:

  • If consumeFinalizedItems() is called after end of input has been signaled, it always clears state (including the end of input flag itself), regardless of whether there are any finalized items
  • If consumeFinalizedItems() is called before end of input has been signaled, it returns an empty List and does not clear state, since in that case the downsampling process is still ongoing and we want to preserve pending items.

I've also added tests to verify this new behavior. Let me know what you think.

@cmnbroad
Copy link
Collaborator Author

@droazen That looks like a good compromise. If github would let me approve my own pull request, I would.

@droazen
Copy link
Collaborator

droazen commented Jan 29, 2019

I'll merge as soon as tests pass (ETA ~25 minutes).

@droazen droazen merged commit a569c97 into master Jan 29, 2019
@droazen droazen deleted the cn_downsampling_iterator branch January 29, 2019 21:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants