-
Notifications
You must be signed in to change notification settings - Fork 589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace Hadoop-BAM with Disq #5138
Conversation
There are currently two failing tests, both of which need fixes in htsjdk.
|
@tomwhite After spending some time searching for this feature for my testing purposes, it would be helpful to expose the NIO adapter toggle directly from the command line in this branch. |
0be0976
to
e7afd79
Compare
Htsjdk and the Disq snapshot have been updated and now the previously failing tests are passing. |
Codecov Report
@@ Coverage Diff @@
## master #5138 +/- ##
================================================
- Coverage 86.988% 73.113% -13.875%
+ Complexity 31224 24503 -6721
================================================
Files 1914 1813 -101
Lines 144264 134638 -9626
Branches 15956 14915 -1041
================================================
- Hits 125492 98438 -27054
- Misses 13003 31489 +18486
+ Partials 5769 4711 -1058
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking the changes are good and clear up some of the clutter in our spark methods. Unfortunately there are some unrelated changes bundled into this branch that make it difficult to evaluate properly. I will take a closer look at the changes made in the GVCF code soon.
|
||
// The underlying reads are required to be in SAMRecord format in order to be | ||
// written out, so we convert them to SAMRecord explicitly here. If they're already | ||
// SAMRecords, this will effectively be a no-op. The SAMRecords will be headerless | ||
// for efficient serialization. | ||
// TODO: add header here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
final JavaSparkContext ctx, final String outputFile, final String referenceFile, final JavaRDD<SAMRecord> reads, | ||
final SAMFileHeader header, final int numReducers, final WriteOption... writeOptions) throws IOException { | ||
|
||
final JavaRDD<SAMRecord> sortedReads = sortSamRecordsToMatchHeader(reads, header, numReducers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this not break tests? There are tests in the codebase right now that forcing a sort on every sharded output should and does break, #4874 ran into this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, there should be something written before and after this stage to stdout, as we found when we added the sort that this step resulted in an opaque final step for spark outputting where it was apparently not providing any output.
A line like logger.info("Finished sorting the bam file and dumping read shards to disk, proceeding to merge the shards into a single file using the master thread");
from the old incarnation of this method should be preserved in the appropriate case.
Though on second thought this may or may not be necessary given that we stay within spark for the next several stages...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No tests have been removed, and they all pass. Do you think there is another case that needs covering?
We can add logging to Disq for some of these operations.
} | ||
else { | ||
if (null == referenceName) { | ||
static String checkCramReference(final JavaSparkContext ctx, final String filePath, final String referencePath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a more informative javadoc that enumerates which cases are accepted and which aren't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (outputFile.endsWith(IOUtil.BCF_FILE_EXTENSION) || outputFile.endsWith(IOUtil.BCF_FILE_EXTENSION + ".gz")) { | ||
throw new UserException.UnimplementedFeature("It is currently not possible to write a BCF file on spark. See https://github.com/broadinstitute/gatk/issues/4303 for more details ."); | ||
} | ||
|
||
if (outputFile.endsWith(BGZFCodec.DEFAULT_EXTENSION) || outputFile.endsWith(".gz")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we no longer handling vcf.gz extension files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do, this case is now handled in Disq. See e.g. https://github.com/disq-bio/disq/blob/master/src/test/java/org/disq_bio/disq/HtsjdkVariantsRddTest.java#L32.
@@ -65,16 +59,13 @@ public VariantsSparkSource(JavaSparkContext ctx) { | |||
* @return JavaRDD<VariantContext> of variants from all files. | |||
*/ | |||
public JavaRDD<VariantContext> getParallelVariantContexts(final String vcf, final List<SimpleInterval> intervals) { | |||
Configuration conf = new Configuration(); | |||
conf.setStrings("io.compression.codecs", BGZFEnhancedGzipCodec.class.getCanonicalName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These codecs? Where did they go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled by Disq.
@@ -152,7 +153,7 @@ private void assertSingleShardedWritingWorks(String inputBam, String referenceFi | |||
|
|||
// check that a splitting bai file is created | |||
if (IOUtils.isBamFileName(outputPath)) { | |||
Assert.assertTrue(Files.exists(IOUtils.getPath(outputPath + SplittingBAMIndexer.OUTPUT_FILE_EXTENSION))); | |||
//Assert.assertTrue(Files.exists(IOUtils.getPath(outputPath + SBIIndex.FILE_EXTENSION))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented out code should probably be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened disq-bio/disq#45 to do this. It shouldn't block this being merged though.
|
||
import java.util.*; | ||
|
||
public class GVCFBlockMergingIterator extends PushToPullIterator<VariantContext> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add a comment here making explicit the fact that you are reusing the PushToPullIterator which talks about downsampling in its comments and indeed appears to have that hardwired into its behavior. Or I would make the commenting on PushToPullIterator more generic (referring to downsampling as an example) to avoid confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reworked the javadoc in #5311 to address this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should pull out the GVCFBlockMergingIterator refactor here into another PR so it can be given its own review.
import static htsjdk.variant.vcf.VCFConstants.MAX_GENOTYPE_QUAL; | ||
import static org.broadinstitute.hellbender.utils.variant.writers.GVCFWriter.GVCF_BLOCK; | ||
|
||
public final class GVCFBlockCombiner implements PushPullTransformer<VariantContext> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you actually spin all of the push/pull iterator code into a separate branch? It deserves a separate PR from the rest of the code in this branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed - the push/pull stuff needs to be reviewed separately.
Opened #5311 for the push/pull part. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes look good to me.
b9d467a
to
b64ce48
Compare
…e-sorted, but it was not. Used 'sort -k1,1 -s' to sort by QNAME field.
Yay! |
Preview of the changes needed to use Disq.