Skip to content

Commit

Permalink
Replace Hadoop-BAM with Disq (#5138)
Browse files Browse the repository at this point in the history
* Disq 0.1.0
* Move VCFHeaderReader
* Fix PileupSparkIntegrationTest
* Fix incorrectly sorted test SAM file: the header said it was queryname-sorted, but it was not. (Used 'sort -k1,1 -s' to sort by QNAME field.)
  • Loading branch information
tomwhite authored Dec 4, 2018
1 parent ba4243d commit 39578f8
Show file tree
Hide file tree
Showing 21 changed files with 4,047 additions and 4,495 deletions.
5 changes: 1 addition & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,7 @@ dependencies {

compile 'org.jgrapht:jgrapht-core:0.9.1'

compile('org.seqdoop:hadoop-bam:' + hadoopBamVersion) {
exclude group: 'org.apache.hadoop'
exclude module: 'htsjdk'
}
compile('org.disq-bio:disq:0.1.0')
compile('org.apache.hadoop:hadoop-client:' + hadoopVersion) // should be a 'provided' dependency
compile('com.github.jsr203hadoop:jsr203hadoop:1.0.3')

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package org.broadinstitute.hellbender.engine.spark.datasources;

import htsjdk.samtools.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.ValidationStringency;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.avro.AvroParquetInputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
Expand All @@ -27,13 +20,12 @@
import org.broadinstitute.hellbender.utils.io.IOUtils;
import org.broadinstitute.hellbender.utils.read.*;
import org.broadinstitute.hellbender.utils.spark.SparkUtils;
import org.seqdoop.hadoop_bam.*;
import org.seqdoop.hadoop_bam.util.SAMHeaderReader;
import org.disq_bio.disq.HtsjdkReadsRdd;
import org.disq_bio.disq.HtsjdkReadsRddStorage;
import org.disq_bio.disq.HtsjdkReadsTraversalParameters;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;

Expand All @@ -42,7 +34,6 @@
*/
public final class ReadsSparkSource implements Serializable {
private static final long serialVersionUID = 1L;
private static final String HADOOP_PART_PREFIX = "part-";

private transient final JavaSparkContext ctx;
private ValidationStringency validationStringency = ReadConstants.DEFAULT_READ_VALIDATION_STRINGENCY;
Expand Down Expand Up @@ -70,33 +61,6 @@ public JavaRDD<GATKRead> getParallelReads(final String readFileName, final Strin
return getParallelReads(readFileName, referencePath, traversalParameters, 0);
}


/**
* this is a hack to work around https://github.com/HadoopGenomics/Hadoop-BAM/issues/199
*
* fix the problem by explicitly sorting the input file splits
*/
public static class SplitSortingSamInputFormat extends AnySAMInputFormat{
@SuppressWarnings("unchecked")
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
final List<InputSplit> splits = super.getSplits(job);

if( splits.stream().allMatch(split -> split instanceof FileVirtualSplit || split instanceof FileSplit)) {
splits.sort(Comparator.comparing(split -> {
if (split instanceof FileVirtualSplit) {
return ((FileVirtualSplit) split).getPath().getName();
} else {
return ((FileSplit) split).getPath().getName();
}
}));
}

return splits;
}
}


/**
* Loads Reads using Hadoop-BAM. For local files, bam must have the fully-qualified path,
* i.e., file:///path/to/bam.bam.
Expand All @@ -108,40 +72,22 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
* @return RDD of (SAMRecord-backed) GATKReads from the file.
*/
public JavaRDD<GATKRead> getParallelReads(final String readFileName, final String referencePath, final TraversalParameters traversalParameters, final long splitSize) {
SAMFileHeader header = getHeader(readFileName, referencePath);

// use the Hadoop configuration attached to the Spark context to maintain cumulative settings
final Configuration conf = ctx.hadoopConfiguration();
if (splitSize > 0) {
conf.set("mapreduce.input.fileinputformat.split.maxsize", Long.toString(splitSize));
}

final JavaPairRDD<LongWritable, SAMRecordWritable> rdd2;

setHadoopBAMConfigurationProperties(readFileName, referencePath);

boolean isBam = IOUtils.isBamFileName(readFileName);
if (isBam) {
if (traversalParameters == null) {
BAMInputFormat.unsetTraversalParameters(conf);
} else {
BAMInputFormat.setTraversalParameters(conf, traversalParameters.getIntervalsForTraversal(), traversalParameters.traverseUnmappedReads());
}
try {
String cramReferencePath = checkCramReference(ctx, readFileName, referencePath);
HtsjdkReadsTraversalParameters<SimpleInterval> tp = traversalParameters == null ? null :
new HtsjdkReadsTraversalParameters<>(traversalParameters.getIntervalsForTraversal(), traversalParameters.traverseUnmappedReads());
HtsjdkReadsRdd htsjdkReadsRdd = HtsjdkReadsRddStorage.makeDefault(ctx)
.splitSize((int) splitSize)
.validationStringency(validationStringency)
.referenceSourcePath(cramReferencePath)
.read(readFileName, tp);
JavaRDD<GATKRead> reads = htsjdkReadsRdd.getReads()
.map(read -> (GATKRead) SAMRecordToGATKReadAdapter.headerlessReadAdapter(read))
.filter(Objects::nonNull);
return fixPartitionsIfQueryGrouped(ctx, htsjdkReadsRdd.getHeader(), reads);
} catch (IOException | IllegalArgumentException e) {
throw new UserException("Failed to load reads from " + readFileName + "\n Caused by:" + e.getMessage(), e);
}

rdd2 = ctx.newAPIHadoopFile(
readFileName, SplitSortingSamInputFormat.class, LongWritable.class, SAMRecordWritable.class,
conf);

JavaRDD<GATKRead> reads= rdd2.map(v1 -> {
SAMRecord sam = v1._2().get();
if (isBam || samRecordOverlaps(sam, traversalParameters)) { // don't check overlaps for BAM since it is done by input format
return (GATKRead) SAMRecordToGATKReadAdapter.headerlessReadAdapter(sam);
}
return null;
}).filter(Objects::nonNull);

return fixPartitionsIfQueryGrouped(ctx, header, reads);
}

private static JavaRDD<GATKRead> fixPartitionsIfQueryGrouped(JavaSparkContext ctx, SAMFileHeader header, JavaRDD<GATKRead> reads) {
Expand Down Expand Up @@ -216,69 +162,36 @@ public SAMFileHeader getHeader(final String filePath, final String referencePath

// local file or HDFs case
try {
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(ctx.hadoopConfiguration());
if (fs.isDirectory(path)) {
FileStatus[] bamFiles = fs.listStatus(path, new PathFilter() {
private static final long serialVersionUID = 1L;
@Override
public boolean accept(Path path) {
return path.getName().startsWith(HADOOP_PART_PREFIX);
}
});
if (bamFiles.length == 0) {
throw new UserException("No BAM files to load header from in: " + path);
}
path = bamFiles[0].getPath(); // Hadoop-BAM writes the same header to each shard, so use the first one
}
setHadoopBAMConfigurationProperties(filePath, referencePath);
return SAMHeaderReader.readSAMHeaderFrom(path, ctx.hadoopConfiguration());
String cramReferencePath = checkCramReference(ctx, filePath, referencePath);
return HtsjdkReadsRddStorage.makeDefault(ctx)
.validationStringency(validationStringency)
.referenceSourcePath(cramReferencePath)
.read(filePath)
.getHeader();
} catch (IOException | IllegalArgumentException e) {
throw new UserException("Failed to read bam header from " + filePath + "\n Caused by:" + e.getMessage(), e);
}
}

/**
* Propagate any values that need to be passed to Hadoop-BAM through configuration properties:
*
* - the validation stringency property is always set using the current value of the
* validationStringency field
* - if the input file is a CRAM file, the reference value will also be set, and must be a URI
* which includes a scheme. if no scheme is provided a "file://" scheme will be used. for
* non-CRAM input the reference may be null.
* - if the input file is not CRAM, the reference property is *unset* to prevent Hadoop-BAM
* from passing a stale value through to htsjdk when multiple read calls are made serially
* with different inputs but the same Spark context
* Check that for CRAM the reference is set to a file that exists and is not 2bit.
* @return the <code>referencePath</code> or <code>null</code> if not CRAM
*/
private void setHadoopBAMConfigurationProperties(final String inputName, final String referenceName) {
// use the Hadoop configuration attached to the Spark context to maintain cumulative settings
final Configuration conf = ctx.hadoopConfiguration();
conf.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, validationStringency.name());

if (!IOUtils.isCramFileName(inputName)) {
// only set the reference for CRAM input
conf.unset(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY);
}
else {
if (null == referenceName) {
static String checkCramReference(final JavaSparkContext ctx, final String filePath, final String referencePath) {
if (IOUtils.isCramFileName(filePath)) {
if (referencePath == null) {
throw new UserException.MissingReference("A reference is required for CRAM input");
}
else {
if ( ReferenceTwoBitSparkSource.isTwoBit(referenceName)) { // htsjdk can't handle 2bit reference files
throw new UserException("A 2bit file cannot be used as a CRAM file reference");
}
else { // Hadoop-BAM requires the reference to be a URI, including scheme
final Path refPath = new Path(referenceName);
if (!SparkUtils.pathExists(ctx, refPath)) {
throw new UserException.MissingReference("The specified fasta file (" + referenceName + ") does not exist.");
}
final String referenceURI = null == refPath.toUri().getScheme() ?
"file://" + new File(referenceName).getAbsolutePath() :
referenceName;
conf.set(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY, referenceURI);
} else if (ReferenceTwoBitSparkSource.isTwoBit(referencePath)) { // htsjdk can't handle 2bit reference files
throw new UserException("A 2bit file cannot be used as a CRAM file reference");
} else {
final Path refPath = new Path(referencePath);
if (!SparkUtils.pathExists(ctx, refPath)) {
throw new UserException.MissingReference("The specified fasta file (" + referencePath + ") does not exist.");
}
}
return referencePath;
}
return null;
}

/**
Expand Down
Loading

0 comments on commit 39578f8

Please sign in to comment.