Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an option to ReadsSparkSink specifying whether to sort the reads on output. #4874

Merged
merged 10 commits into from
Feb 12, 2019

Conversation

jamesemery
Copy link
Collaborator

Fixes #4859

Depends on #4545

@jamesemery jamesemery force-pushed the je_parameterizeReadsSparkSourceSort branch from 6d4f8fc to 12b502d Compare June 14, 2018 16:11
@jamesemery
Copy link
Collaborator Author

@lbergelson Can we revisit this branch? We need to clean up the current sorting behavior as it is totally inconsistent and confusing. Here was an attempt to clean up the behavior so it will be more uniform.

@jamesemery jamesemery force-pushed the je_parameterizeReadsSparkSourceSort branch from 12b502d to 538658a Compare July 6, 2018 14:51
@droazen droazen requested a review from tomwhite August 27, 2018 19:20
@droazen droazen assigned tomwhite and unassigned lbergelson Aug 27, 2018
@droazen
Copy link
Collaborator

droazen commented Aug 27, 2018

@tomwhite Can you please have a look at this PR when you have time?

@jamesemery
Copy link
Collaborator Author

The idea behind this branch: make the output to readsSparkSort consistent and configurable. So that if a tool alters reads without changing their sort order then no sort will be performed by default. It also means that if you request sharded output there is the ability to ask reasSparkSource to sort the file for you.

Copy link
Contributor

@tomwhite tomwhite left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall change looks like a good one, but the test isn't quite done.

@@ -166,7 +166,7 @@ public SparkHeaderlessCRAMOutputFormat() {
public static void writeReads(
final JavaSparkContext ctx, final String outputFile, final String referenceFile, final JavaRDD<GATKRead> reads,
final SAMFileHeader header, ReadsWriteFormat format) throws IOException {
writeReads(ctx, outputFile, referenceFile, reads, header, format, 0, null);
writeReads(ctx, outputFile, referenceFile, reads, header, format, 0, null,format==ReadsWriteFormat.SINGLE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: missing space before comma

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add a comment about how this is here to maintain old weird behavior.

@@ -242,6 +242,26 @@ public void testUnSorted() throws Exception {
SamAssertionUtils.assertSamsEqual(outBam, inBam);
}

@Test( groups = "spark")
public void testUnSorted() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks like it's not finished? It shares a name with an existing test, and print_reads.mismatchedHeader.sam is unused.

Copy link
Member

@lbergelson lbergelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jamesemery The basic change looks ok to me. It's failing to compile tests because of the duplicated test name that tom pointed out. I have a few minor comments, but once the test is fixed I think it should be good to go assuming that nothing else is failing.

Should we open a new ticket after this to revisit the different treatment between sharded and non-sharded output by default? It seems like we should get rid of that because it's confusing, but I assume you left that bit in in order to keep from breaking things and get the option to control it in faster.

@@ -166,7 +166,7 @@ public SparkHeaderlessCRAMOutputFormat() {
public static void writeReads(
final JavaSparkContext ctx, final String outputFile, final String referenceFile, final JavaRDD<GATKRead> reads,
final SAMFileHeader header, ReadsWriteFormat format) throws IOException {
writeReads(ctx, outputFile, referenceFile, reads, header, format, 0, null);
writeReads(ctx, outputFile, referenceFile, reads, header, format, 0, null,format==ReadsWriteFormat.SINGLE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add a comment about how this is here to maintain old weird behavior.

@@ -201,19 +202,20 @@ public static void writeReads(
// SAMRecords, this will effectively be a no-op. The SAMRecords will be headerless
// for efficient serialization.
final JavaRDD<SAMRecord> samReads = reads.map(read -> read.convertToSAMRecord(null));
final JavaRDD<SAMRecord> readsToUse = sortReadsToHeader ? sortSamRecordsToMatchHeader(samReads, header, numReducers) : samReads;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe readsToUse -> readsToOutput would be clearer?

@jamesemery
Copy link
Collaborator Author

@lbergelson @droazen This branch has been brought to speed with master now, I need somebody to review it however....

@codecov-io
Copy link

codecov-io commented Feb 8, 2019

Codecov Report

Merging #4874 into master will increase coverage by 0.005%.
The diff coverage is 90.909%.

@@               Coverage Diff               @@
##              master     #4874       +/-   ##
===============================================
+ Coverage     87.044%   87.049%   +0.005%     
- Complexity     31693     31697        +4     
===============================================
  Files           1938      1938               
  Lines         146041    146097       +56     
  Branches       16124     16128        +4     
===============================================
+ Hits          127120    127176       +56     
  Misses         13036     13036               
  Partials        5885      5885
Impacted Files Coverage Δ Complexity Δ
...nder/tools/spark/pathseq/PathSeqPipelineSpark.java 81.25% <ø> (ø) 7 <0> (ø) ⬇️
...lbender/tools/spark/pathseq/PathSeqScoreSpark.java 57.407% <ø> (ø) 7 <0> (ø) ⬇️
...llbender/engine/spark/DataprocIntegrationTest.java 1.786% <0%> (+0.062%) 1 <0> (ø) ⬇️
...hellbender/tools/spark/pipelines/SortSamSpark.java 100% <100%> (ø) 5 <0> (-1) ⬇️
...ellbender/tools/spark/pathseq/PathSeqBwaSpark.java 67.391% <100%> (ø) 7 <0> (ø) ⬇️
...ls/ExtractOriginalAlignmentRecordsByNameSpark.java 90.909% <100%> (ø) 10 <0> (ø) ⬇️
...stitute/hellbender/tools/spark/RevertSamSpark.java 83.895% <100%> (ø) 86 <0> (ø) ⬇️
...institute/hellbender/tools/spark/bwa/BwaSpark.java 78.947% <100%> (+1.17%) 7 <0> (ø) ⬇️
...stitute/hellbender/engine/spark/GATKSparkTool.java 82.418% <100%> (ø) 78 <1> (ø) ⬇️
...nder/tools/spark/pipelines/ReadsPipelineSpark.java 90.741% <100%> (ø) 13 <0> (ø) ⬇️
... and 12 more

Copy link
Member

@lbergelson lbergelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you have a bug, and a few random comments.

writeReads(ctx, outputFile, referenceFile, reads, header, format, 0, null);
// NOTE, we must include 'format==ReadsWriteFormat.SINGLE' to preserve the old default behavior for writing spark output
// which would not sort the bam if outputting to ReadsWriteFormat.SINGLE. Please use the overload for different sroting behavior.
writeReads(ctx, outputFile, referenceFile, reads, header, format, 0, null, format==ReadsWriteFormat.SINGLE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to remove this particular wierdness. How hard is it to just eliminate this behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only remaining use is in BwaSpark and that can be changed to just always sort I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or to take a parameter to sort I guess.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way...

final SAMFileHeader header, ReadsWriteFormat format, final int numReducers, final String outputPartsDir) throws IOException {
writeReads(ctx, outputFile, referenceFile, reads, header, format, numReducers, outputPartsDir, true, true);
final SAMFileHeader header, ReadsWriteFormat format, final int numReducers, final String outputPartsDir, final boolean sortReadsToHeader) throws IOException {
writeReads(ctx, outputFile, referenceFile, reads, header, format, numReducers, outputPartsDir, true, true, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a bug... you added a boolean parameter to the method and then set it to be true always.

@@ -48,9 +48,6 @@ private void shutdownMiniCluster() {
public Object[][] loadReadsBAM() {
return new Object[][]{
{testDataDir + "tools/BQSR/HiSeq.1mb.1RG.2k_lines.bam", "ReadsSparkSinkUnitTest1", null, ".bam", true, true},
{testDataDir + "tools/BQSR/HiSeq.1mb.1RG.2k_lines.bam", "ReadsSparkSinkUnitTest1", null, ".bam", true, false}, // write BAI, don't write SBI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to delete these test cases?

@@ -149,7 +146,7 @@ private void assertSingleShardedWritingWorks(String inputBam, String referenceFi
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, referenceFile);
SAMFileHeader header = readSource.getHeader(inputBam, referenceFile);

ReadsSparkSink.writeReads(ctx, outputPath, referenceFile, rddParallelReads, header, ReadsWriteFormat.SINGLE, 0, outputPartsPath, writeBai, writeSbi);
ReadsSparkSink.writeReads(ctx, outputPath, referenceFile, rddParallelReads, header, ReadsWriteFormat.SINGLE, 0, outputPartsPath, writeBai, writeSbi, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should have a test case for writing unsorted reads in this class I think.

}
final File outBam = GATKBaseTest.createTempFile("print_reads", ".bam");
ArgumentsBuilder args = new ArgumentsBuilder();
args.add("--" + StandardArgumentDefinitions.INPUT_LONG_NAME);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have convenience methods for this..

@jamesemery
Copy link
Collaborator Author

@lbergelson Responded to your comments, I left the behavior in bwaSpark for sharded writing as I suspect that was the specific use case for which it was introduced. Back to you...

@jamesemery jamesemery force-pushed the je_parameterizeReadsSparkSourceSort branch from a874279 to 34c8e34 Compare February 11, 2019 16:59
writeReads(ctx, outputFile, referenceFile, reads, header, format, 0, null);
// NOTE, we must include 'format==ReadsWriteFormat.SINGLE' to preserve the old default behavior for writing spark output
// which would not sort the bam if outputting to ReadsWriteFormat.SINGLE. Please use the overload for different sorting behavior.
writeReads(ctx, outputFile, referenceFile, reads, header, format, 0, null, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think this behavior is important to bwaspark, push it down there. This is going to bite us in the future if we leave it in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you did, you just forgot the comment.

@@ -74,7 +74,7 @@ protected void runTool(final JavaSparkContext ctx) {
}
try {
ReadsSparkSink.writeReads(ctx, output, null, reads, bwaEngine.getHeader(),
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE);
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, getRecommendedNumReducers(), shardedPartsDir, shardedOutput);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe comment here so people know that something weird is going on?

@jamesemery
Copy link
Collaborator Author

@lbergelson Added a comment to BwaSpark. Can this be merged?

Copy link
Member

@lbergelson lbergelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@jamesemery jamesemery dismissed tomwhite’s stale review February 12, 2019 14:33

It's been approved by louis

@jamesemery jamesemery merged commit 9c22c34 into master Feb 12, 2019
@jamesemery jamesemery deleted the je_parameterizeReadsSparkSourceSort branch February 12, 2019 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants