diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/AddContextDataToReadSpark.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/AddContextDataToReadSpark.java deleted file mode 100644 index 592e8af3d8e..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/AddContextDataToReadSpark.java +++ /dev/null @@ -1,146 +0,0 @@ -package org.broadinstitute.hellbender.engine.spark; - -import com.google.common.base.Function; -import com.google.common.collect.Iterators; -import htsjdk.samtools.SAMSequenceDictionary; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.broadcast.Broadcast; -import org.broadinstitute.hellbender.engine.ReadContextData; -import org.broadinstitute.hellbender.engine.Shard; -import org.broadinstitute.hellbender.engine.ShardBoundary; -import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceMultiSparkSource; -import org.broadinstitute.hellbender.engine.filters.ReadFilterLibrary; -import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceWindowFunctions; -import org.broadinstitute.hellbender.exceptions.UserException; -import org.broadinstitute.hellbender.utils.IntervalUtils; -import org.broadinstitute.hellbender.utils.SimpleInterval; -import org.broadinstitute.hellbender.utils.collections.IntervalsSkipList; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.reference.ReferenceBases; -import org.broadinstitute.hellbender.utils.variant.GATKVariant; -import scala.Tuple2; - -import javax.annotation.Nullable; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.stream.Collectors; - -/** - * AddContextDataToRead pairs reference bases and overlapping variants with each GATKRead in the RDD input. - * The variants are obtained from a local file (later a GCS Bucket). The reference bases come from the Google Genomics API. - * - * This transform is intended for direct use in pipelines. - * - * This transform will filter out any unmapped reads. - * - * The reference bases paired with each read can be customized by passing in a reference window function - * inside the {@link ReferenceMultiSparkSource} argument to {@link #add}. See - * {@link ReferenceWindowFunctions} for examples. - */ -public class AddContextDataToReadSpark { - /** - * Add context data ({@link ReadContextData}) to reads. - * @param ctx the Spark context - * @param reads the coordinate-sorted reads - * @param referenceSource the reference source - * @param variants the coordinate-sorted variants - * @param variantsPaths the paths to variants files - * @param joinStrategy the strategy to use to join context data to reads - * @param sequenceDictionary the sequence dictionary for the reads (only used for OVERLAPS_PARTITIONER join strategy, use null otherwise) - * @param shardSize the maximum size of each shard, in bases (only used for OVERLAPS_PARTITIONER join strategy, use 0 otherwise) - * @param shardPadding amount of extra context around each shard, in bases (only used for OVERLAPS_PARTITIONER join strategy, use 0 otherwise) - * @return a RDD of read-context pairs, in coordinate-sorted order - */ - public static JavaPairRDD add( - final JavaSparkContext ctx, - final JavaRDD reads, final ReferenceMultiSparkSource referenceSource, - final JavaRDD variants, final List variantsPaths, final JoinStrategy joinStrategy, - final SAMSequenceDictionary sequenceDictionary, - final int shardSize, final int shardPadding) { - // TODO: this static method should not be filtering the unmapped reads. To be addressed in another issue. - JavaRDD mappedReads = reads.filter(read -> ReadFilterLibrary.MAPPED.test(read)); - JavaPairRDD, ReferenceBases>> withVariantsWithRef; - if (joinStrategy.equals(JoinStrategy.BROADCAST)) { - // Join Reads and Variants - JavaPairRDD> withVariants = variantsPaths == null ? BroadcastJoinReadsWithVariants.join(mappedReads, variants) : BroadcastJoinReadsWithVariants.join(mappedReads, variantsPaths); - // Join Reads with ReferenceBases - withVariantsWithRef = BroadcastJoinReadsWithRefBases.addBases(referenceSource, withVariants); - } else if (joinStrategy.equals(JoinStrategy.SHUFFLE)) { - // Join Reads and Variants - JavaPairRDD> withVariants = ShuffleJoinReadsWithVariants.join(mappedReads, variants); - // Join Reads with ReferenceBases - withVariantsWithRef = ShuffleJoinReadsWithRefBases.addBases(referenceSource, withVariants); - } else if (joinStrategy.equals(JoinStrategy.OVERLAPS_PARTITIONER)) { - return addUsingOverlapsPartitioning(ctx, reads, referenceSource, variants, variantsPaths, sequenceDictionary, shardSize, shardPadding); - } else { - throw new UserException("Unknown JoinStrategy"); - } - return withVariantsWithRef.mapToPair(in -> new Tuple2<>(in._1(), new ReadContextData(in._2()._2(), in._2()._1()))); - } - - /** - * Add context data ({@link ReadContextData}) to reads, using overlaps partitioning to avoid a shuffle. - * @param ctx the Spark context - * @param mappedReads the coordinate-sorted reads - * @param referenceSource the reference source - * @param variants the coordinate-sorted variants - * @param variantsPaths the paths to variants files, if null then the variants RDD is used - * @param sequenceDictionary the sequence dictionary for the reads - * @param shardSize the maximum size of each shard, in bases - * @param shardPadding amount of extra context around each shard, in bases - * @return a RDD of read-context pairs, in coordinate-sorted order - */ - private static JavaPairRDD addUsingOverlapsPartitioning( - final JavaSparkContext ctx, - final JavaRDD mappedReads, final ReferenceMultiSparkSource referenceSource, - final JavaRDD variants, final List variantsPaths, final SAMSequenceDictionary sequenceDictionary, - final int shardSize, final int shardPadding) { - - final List intervals = IntervalUtils.getAllIntervalsForReference(sequenceDictionary); - // use unpadded shards (padding is only needed for reference bases) - final List intervalShards = intervals.stream() - .flatMap(interval -> Shard.divideIntervalIntoShards(interval, shardSize, 0, sequenceDictionary).stream()) - .collect(Collectors.toList()); - - final Broadcast bReferenceSource = ctx.broadcast(referenceSource); - final Broadcast> variantsBroadcast = variantsPaths == null ? ctx.broadcast(new IntervalsSkipList<>(variants.collect())) : null; - - int maxLocatableSize = Math.min(shardSize, shardPadding); - JavaRDD> shardedReads = SparkSharder.shard(ctx, mappedReads, GATKRead.class, sequenceDictionary, intervalShards, maxLocatableSize); - return shardedReads.flatMapToPair( - new PairFlatMapFunction, GATKRead, ReadContextData>() { - private static final long serialVersionUID = 1L; - - @Override - public Iterator> call(Shard shard) throws Exception { - // get reference bases for this shard (padded) - SimpleInterval paddedInterval = shard.getInterval().expandWithinContig(shardPadding, sequenceDictionary); - ReferenceBases referenceBases = bReferenceSource.getValue().getReferenceBases(paddedInterval); - final IntervalsSkipList intervalsSkipList = variantsPaths == null ? variantsBroadcast.getValue() : - KnownSitesCache.getVariants(variantsPaths); - Iterator> transform = Iterators.transform(shard.iterator(), new Function>() { - @Nullable - @Override - public Tuple2 apply(@Nullable GATKRead r) { - List overlappingVariants; - if (SimpleInterval.isValid(r.getContig(), r.getStart(), r.getEnd())) { - overlappingVariants = intervalsSkipList.getOverlapping(new SimpleInterval(r)); - } else { - //Sometimes we have reads that do not form valid intervals (reads that do not consume any ref bases, eg CIGAR 61S90I - //In those cases, we'll just say that nothing overlaps the read - overlappingVariants = Collections.emptyList(); - } - return new Tuple2<>(r, new ReadContextData(referenceBases, overlappingVariants)); - } - }); - // only include reads that start in the shard - return Iterators.filter(transform, r -> r._1().getStart() >= shard.getStart() - && r._1().getStart() <= shard.getEnd()); - } - }); - } -} diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/BroadcastJoinReadsWithRefBases.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/BroadcastJoinReadsWithRefBases.java deleted file mode 100644 index ceca975e7d7..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/BroadcastJoinReadsWithRefBases.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.broadinstitute.hellbender.engine.spark; - -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.broadcast.Broadcast; -import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceMultiSparkSource; -import org.broadinstitute.hellbender.utils.SimpleInterval; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.reference.ReferenceBases; -import scala.Tuple2; - -/** - * Joins an RDD of GATKReads to reference data using a broadcast strategy. - * - * The ReferenceDataflowSource is broadcast using Spark's Broadcast variable mechanism. The reads are then mapped - * over and a reference query is executed on each read. This makes sense for ReferenceDataflowSource implementations - * that contain the reference data in memory (e.g., ReferenceTwoBitSource), but will likely be much slower for - * implementations that have to query other resources for the reference sequences. - */ -public class BroadcastJoinReadsWithRefBases { - - /** - * Joins each read of an RDD with that read's corresponding reference sequence. - * - * @param referenceDataflowSource The source of the reference sequence information - * @param reads The reads for which to extract reference sequence information - * @return The JavaPairRDD that contains each read along with the corresponding ReferenceBases object - */ - public static JavaPairRDD addBases(final ReferenceMultiSparkSource referenceDataflowSource, - final JavaRDD reads) { - JavaSparkContext ctx = new JavaSparkContext(reads.context()); - Broadcast bReferenceSource = ctx.broadcast(referenceDataflowSource); - return reads.mapToPair(read -> { - SimpleInterval interval = bReferenceSource.getValue().getReferenceWindowFunction().apply(read); - return new Tuple2<>(read, bReferenceSource.getValue().getReferenceBases(interval)); - }); - } - - /** - * Joins each read of an RDD with key's corresponding reference sequence. - * - * @param referenceDataflowSource The source of the reference sequence information - * @param keyedByRead The read-keyed RDD for which to extract reference sequence information - * @return The JavaPairRDD that contains each read along with the corresponding ReferenceBases object and the value - */ - public static JavaPairRDD> addBases(final ReferenceMultiSparkSource referenceDataflowSource, - final JavaPairRDD keyedByRead) { - JavaSparkContext ctx = new JavaSparkContext(keyedByRead.context()); - Broadcast bReferenceSource = ctx.broadcast(referenceDataflowSource); - return keyedByRead.mapToPair(pair -> { - SimpleInterval interval = bReferenceSource.getValue().getReferenceWindowFunction().apply(pair._1()); - return new Tuple2<>(pair._1(), new Tuple2<>(pair._2(), bReferenceSource.getValue().getReferenceBases( - interval))); - }); - } -} diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/BroadcastJoinReadsWithVariants.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/BroadcastJoinReadsWithVariants.java deleted file mode 100644 index b7686e30ac9..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/BroadcastJoinReadsWithVariants.java +++ /dev/null @@ -1,60 +0,0 @@ -package org.broadinstitute.hellbender.engine.spark; - -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.broadcast.Broadcast; -import org.broadinstitute.hellbender.utils.collections.IntervalsSkipList; -import org.broadinstitute.hellbender.utils.SimpleInterval; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.variant.GATKVariant; -import scala.Tuple2; - -import java.util.Collections; -import java.util.List; - -/** - * Joins an RDD of GATKReads to variant data using a broadcast strategy. - * - * The variants RDD is materialized as a List then broadcast using Spark's Broadcast variable mechanism. The reads are - * then mapped over and overlapping variants are added for each read. - */ -public final class BroadcastJoinReadsWithVariants { - private BroadcastJoinReadsWithVariants(){} - - /** - * Joins each read of an RDD with overlapping variants from an RDD of GATKVariants. Broadcasts the - * variants, so this is only suitable for collections of variants that are <2GB (due to Spark's broadcast limitation). - * - * @param reads the RDD of reads, in coordinate-sorted order - * @param variants the RDD of variants - * @return an RDD that contains each read along with the overlapping variants - */ - public static JavaPairRDD> join(final JavaRDD reads, final JavaRDD variants) { - final JavaSparkContext ctx = new JavaSparkContext(reads.context()); - final Broadcast> variantsBroadcast = ctx.broadcast(new IntervalsSkipList<>(variants.collect())); - return reads.mapToPair(r -> getOverlapping(r, variantsBroadcast.getValue())); - } - - /** - * Joins each read of an RDD with overlapping variants from an RDD of GATKVariants. Can be used for any size of - * variants (although they are still read into memory) since Spark broadcast is not used. - * - * @param reads the RDD of reads, in coordinate-sorted order - * @param variantsPaths the path to the variants file - * @return an RDD that contains each read along with the overlapping variants - */ - public static JavaPairRDD> join(final JavaRDD reads, final List variantsPaths) { - return reads.mapToPair(r -> getOverlapping(r, KnownSitesCache.getVariants(variantsPaths))); - } - - private static Tuple2> getOverlapping(final GATKRead read, final IntervalsSkipList intervalsSkipList) { - if (SimpleInterval.isValid(read.getContig(), read.getStart(), read.getEnd())) { - return new Tuple2<>(read, intervalsSkipList.getOverlapping(new SimpleInterval(read))); - } else { - //Sometimes we have reads that do not form valid intervals (reads that do not consume any ref bases, eg CIGAR 61S90I - //In those cases, we'll just say that nothing overlaps the read - return new Tuple2<>(read, Collections.emptyList()); - } - } -} \ No newline at end of file diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/JoinStrategy.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/JoinStrategy.java deleted file mode 100644 index 3c72d26e916..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/JoinStrategy.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.broadinstitute.hellbender.engine.spark; - -/** - * Possible join strategies when using Spark - */ -public enum JoinStrategy { - /** - * Use a broadcast join strategy, where one side of the join is collected into memory and broadcast to all workers. - */ - BROADCAST, - - /** - * Use an overlaps partitioner strategy, where one side of the join is sharded in partitions and the other side is broadcast. - */ - OVERLAPS_PARTITIONER, - - /** - * Use a shuffle join strategy, where both sides of join are shuffled across the workers. - */ - SHUFFLE -} diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/KnownSitesCache.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/KnownSitesCache.java deleted file mode 100644 index 4f4e140a8e2..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/KnownSitesCache.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.broadinstitute.hellbender.engine.spark; - -import htsjdk.variant.variantcontext.VariantContext; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.broadinstitute.hellbender.engine.FeatureDataSource; -import org.broadinstitute.hellbender.utils.collections.IntervalsSkipList; -import org.broadinstitute.hellbender.utils.variant.GATKVariant; -import org.broadinstitute.hellbender.utils.variant.VariantContextVariantAdapter; - -import java.util.*; -import java.util.stream.Collectors; - -/** - * A cache of known sites by file path, with the property that there is only one copy of each collection of known sites per JVM. - * This class is an alternative for cases that can't use a Spark broadcast due to its 2GB limitation. - */ -class KnownSitesCache { - - private static final Logger log = LogManager.getLogger(KnownSitesCache.class); - - private static final Map, IntervalsSkipList> PATHS_TO_VARIANTS = new HashMap<>(); - - public static synchronized IntervalsSkipList getVariants(List paths) { - if (PATHS_TO_VARIANTS.containsKey(paths)) { - return PATHS_TO_VARIANTS.get(paths); - } - IntervalsSkipList variants = retrieveVariants(paths); - PATHS_TO_VARIANTS.put(paths, variants); - return variants; - } - - private static IntervalsSkipList retrieveVariants(List paths) { - return new IntervalsSkipList<>(paths - .stream() - .map(KnownSitesCache::loadFromFeatureDataSource) - .flatMap(Collection::stream) - .collect(Collectors.toList())); - } - - private static List loadFromFeatureDataSource(String path) { - int cloudPrefetchBuffer = 40; // only used for GCS - try ( final FeatureDataSource dataSource = new FeatureDataSource<>(path, null, 0, null, cloudPrefetchBuffer, cloudPrefetchBuffer) ) { - return wrapQueryResults(dataSource.iterator()); - } - } - - private static List wrapQueryResults(final Iterator queryResults ) { - final List wrappedResults = new ArrayList<>(); - long count = 0; - while ( queryResults.hasNext() ) { - if (count++ % 100000 == 0) { - log.info("Number of variants read: " + count); - } - wrappedResults.add(VariantContextVariantAdapter.sparkVariantAdapter(queryResults.next())); - } - return wrappedResults; - } -} diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/ShuffleJoinReadsWithRefBases.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/ShuffleJoinReadsWithRefBases.java deleted file mode 100644 index 5787f685ca9..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/ShuffleJoinReadsWithRefBases.java +++ /dev/null @@ -1,131 +0,0 @@ -package org.broadinstitute.hellbender.engine.spark; - -import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceWindowFunctions; -import org.broadinstitute.hellbender.utils.SerializableFunction; -import com.google.common.collect.Lists; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.broadinstitute.hellbender.engine.ReferenceShard; -import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceMultiSparkSource; -import org.broadinstitute.hellbender.utils.IntervalUtils; -import org.broadinstitute.hellbender.utils.SimpleInterval; -import org.broadinstitute.hellbender.utils.Utils; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.reference.ReferenceBases; -import scala.Tuple2; - -import java.util.List; -import java.util.stream.Collectors; - -/** - * RefBasesForReads queries the Google Genomics API for reference bases overlapping all of the reads. - * - * step 1: key reads by reference shards - * - * |--------- shard 0 ----------|---------- shard 1 ----------|--------- shard 2 ----------|--------- shard 3 ----------| - * |------ read a -----| |-- read b --| |---- read c ----| - * - * step 2: group reads by the shard they start in - * - * |--------- shard 0 ----------| - * |------ read a -----| - * - * - * |--------- shard 2 ----------| - * |-- read b --| |---- read c ----| - * - * step 3: query the Google Genomics API for all bases needed for each shard - * - - * |--- ref bases 1 ---| |--------- ref bases 2 -----------| - * |------ read a -----| |-- read b --| |---- read c ----| - * - * step 4: pair the ref bases needed for each read with the read - * - * |------ read a -----| |-- read b --| |---- read c ----| - * |-- ref bases 1a ---| |ref bases 2b| |- ref bases 2c -| - * - * or in code, - * KV - * KV - * KV - * - * The reference bases paired with each read can be customized by passing in a reference window function - * inside the {@link ReferenceMultiSparkSource} argument to {@link #addBases}. See {@link ReferenceWindowFunctions} for examples. - */ -public final class ShuffleJoinReadsWithRefBases { - - /** - * Joins each read of an RDD with that read's corresponding reference sequence. - * - * @param referenceDataflowSource The source of the reference sequence information - * @param reads The reads for which to extract reference sequence information - * @return The JavaPairRDD that contains each read along with the corresponding ReferenceBases object - */ - public static JavaPairRDD addBases(final ReferenceMultiSparkSource referenceDataflowSource, - final JavaRDD reads) { - // TODO: reimpl this method by calling out to the more complex version? - SerializableFunction windowFunction = referenceDataflowSource.getReferenceWindowFunction(); - - JavaPairRDD shardRead = reads.mapToPair(gatkRead -> { - ReferenceShard shard = ReferenceShard.getShardNumberFromInterval(windowFunction.apply(gatkRead)); - return new Tuple2<>(shard, gatkRead); - }); - - JavaPairRDD> shardiRead = shardRead.groupByKey(); - - return shardiRead.flatMapToPair(in -> { - List> out = Lists.newArrayList(); - Iterable iReads = in._2(); - - // Apply the reference window function to each read to produce a set of intervals representing - // the desired reference bases for each read. - final List readWindows = Utils.stream(iReads).map(read -> windowFunction.apply(read)).collect(Collectors.toList()); - - SimpleInterval interval = IntervalUtils.getSpanningInterval(readWindows); - ReferenceBases bases = referenceDataflowSource.getReferenceBases(interval); - for (GATKRead r : iReads) { - final ReferenceBases subset = bases.getSubset(windowFunction.apply(r)); - out.add(new Tuple2<>(r, subset)); - } - return out.iterator(); - }); - } - - /** - * Joins each read of an RDD with key's corresponding reference sequence. - * - * @param referenceDataflowSource The source of the reference sequence information - * @param keyedByRead The read-keyed RDD for which to extract reference sequence information - * @return The JavaPairRDD that contains each read along with the corresponding ReferenceBases object and the value - */ - public static JavaPairRDD> addBases(final ReferenceMultiSparkSource referenceDataflowSource, - final JavaPairRDD keyedByRead) { - SerializableFunction windowFunction = referenceDataflowSource.getReferenceWindowFunction(); - - JavaPairRDD> shardRead = keyedByRead.mapToPair(pair -> { - ReferenceShard shard = ReferenceShard.getShardNumberFromInterval(windowFunction.apply(pair._1())); - return new Tuple2<>(shard, pair); - }); - - JavaPairRDD>> shardiRead = shardRead.groupByKey(); - - return shardiRead.flatMapToPair(in -> { - List>> out = Lists.newArrayList(); - Iterable> iReads = in._2(); - - // Apply the reference window function to each read to produce a set of intervals representing - // the desired reference bases for each read. - final List readWindows = Utils.stream(iReads).map(pair -> windowFunction.apply(pair._1())).collect(Collectors.toList()); - - SimpleInterval interval = IntervalUtils.getSpanningInterval(readWindows); - // TODO: don't we need to support GCS PipelineOptions? - ReferenceBases bases = referenceDataflowSource.getReferenceBases(interval); - for (Tuple2 p : iReads) { - final ReferenceBases subset = bases.getSubset(windowFunction.apply(p._1())); - out.add(new Tuple2<>(p._1(), new Tuple2<>(p._2(), subset))); - } - return out.iterator(); - }); - } -} diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/ShuffleJoinReadsWithVariants.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/ShuffleJoinReadsWithVariants.java deleted file mode 100644 index 94375542600..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/ShuffleJoinReadsWithVariants.java +++ /dev/null @@ -1,132 +0,0 @@ -package org.broadinstitute.hellbender.engine.spark; - -import com.google.common.collect.Lists; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.broadinstitute.hellbender.engine.VariantShard; -import org.broadinstitute.hellbender.utils.SimpleInterval; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.variant.GATKVariant; -import scala.Tuple2; - -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -/** - * PairReadsAndVariants takes two RDDs (GATKRead and Variant) and returns an RDD with a - * (GATKRead,Iterable) for every read and all the variants that overlap. We do this by first - * Making Tuple2s of GATKRead,Variant, which means that a read or variant may be present - * multiple times in the output. Also, there may be duplicate (GATKRead,Variant) pairs in the output. - * Currently, all reads must be mapped. We then aggregrate by key to remove duplicate variants. - * - * The function works by creating a single RDD of Tuple2 where GATKRead is the key and Variant is the value. - * We do this join by sharding both collections by "variant shard" and checking for overlap on each "shard." - * - * step 1: key reads and variants by shard - * |---- shard 0 -----|---- shard 1 -----|---- shard 2 -----|---- shard 3 -----|---- shard 4 -----| - * |---------- read a ---------| |----- read b ------| - * |- variant 1 -| |- variant 2 -| |- variant 3 -| - * - * step 2: shard read and variant by variant shard - * |---- shard 0 -----| - * |---------- read a ---------| - * |- variant 1 -| - * - * - * |---- shard 1 -----| - * |---------- read a ---------| - * |- variant 2 -| - * - * - * |---- shard 2 -----| - * |----- read b ------| - * |- variant 3 -| - * - * |---- shard 3 -----| - * |----- read b ------| - * |- variant 3 -| - * - * step 3: pair reads and variants - * Tuple2 // from shard 0 - * Tuple2 // from shard 1 - * Tuple2 // from shard 2 - * Tuple2 // from shard 3 - * - * step 4: aggregate by key - * Tuple2> - * Tuple2> - */ -public class ShuffleJoinReadsWithVariants { - public static JavaPairRDD> join( - final JavaRDD reads, final JavaRDD variants) { - - JavaPairRDD readsWShards = pairReadsWithVariantShards(reads); - - JavaPairRDD variantsWShards = pairVariantsWithVariantShards(variants); - - // generate read-variant pairs; however, the reads are replicated for each overlapping pair - JavaPairRDD allPairs = pairReadsWithVariants(readsWShards, variantsWShards); - - // we group together all variants for each unique GATKRead. As we combine through the Variants, they are added - // to a HashSet that get continually merged together - return allPairs.aggregateByKey(new LinkedHashSet<>(), (vs, v) -> { - if (v != null) { // pairReadsWithVariants can produce null variant - ((Set) vs).add(v); - } - return vs; - }, (vs1, vs2) -> { - ((Set) vs1).addAll((Set) vs2); - return vs1; - }); - } - - private static JavaPairRDD pairReadsWithVariantShards(final JavaRDD reads) { - return reads.flatMapToPair(gatkRead -> { - List shards = VariantShard.getVariantShardsFromInterval(gatkRead); - List> out = Lists.newArrayList(); - for (VariantShard shard : shards) { - out.add(new Tuple2<>(shard, gatkRead)); - } - return out.iterator(); - }); - } - - private static JavaPairRDD pairVariantsWithVariantShards(final JavaRDD variants) { - return variants.flatMapToPair(variant -> { - List shards = VariantShard.getVariantShardsFromInterval(variant); - List> out = Lists.newArrayList(); - for (VariantShard shard : shards) { - out.add(new Tuple2<>(shard, variant)); - } - return out.iterator(); - }); - } - private static JavaPairRDD pairReadsWithVariants(final JavaPairRDD readsWShards, - final JavaPairRDD variantsWShards) { - JavaPairRDD, Iterable>> cogroup = readsWShards.cogroup(variantsWShards); - - return cogroup.flatMapToPair(cogroupValue -> { - Iterable iReads = cogroupValue._2()._1(); - Iterable iVariants = cogroupValue._2()._2(); - - List> out = Lists.newArrayList(); - // For every read, find every overlapping variant. - for (GATKRead r : iReads) { - boolean foundVariants = false; - SimpleInterval interval = new SimpleInterval(r); - for (GATKVariant v : iVariants) { - if (interval.overlaps(v)) { - foundVariants = true; - out.add(new Tuple2<>(r, v)); - } - } - // If no variants are found, we still want to output the read. - if (!foundVariants) { - out.add(new Tuple2<>(r, null)); - } - } - return out.iterator(); - }); - } -} diff --git a/src/main/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSpark.java index 7bba82ea65a..a4c0c67c8e3 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSpark.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSpark.java @@ -16,6 +16,7 @@ import org.broadinstitute.barclay.argparser.ArgumentCollection; import org.broadinstitute.barclay.argparser.BetaFeature; import org.broadinstitute.barclay.argparser.CommandLineProgramProperties; +import org.broadinstitute.barclay.argparser.*; import org.broadinstitute.barclay.help.DocumentedFeature; import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; import org.broadinstitute.hellbender.cmdline.programgroups.ShortVariantDiscoveryProgramGroup; @@ -23,8 +24,11 @@ import org.broadinstitute.hellbender.engine.AssemblyRegionEvaluator; import org.broadinstitute.hellbender.engine.FeatureContext; import org.broadinstitute.hellbender.engine.ShardBoundary; +import org.broadinstitute.hellbender.engine.*; import org.broadinstitute.hellbender.engine.filters.ReadFilter; import org.broadinstitute.hellbender.engine.spark.*; +import org.broadinstitute.hellbender.engine.spark.GATKSparkTool; +import org.broadinstitute.hellbender.engine.spark.SparkSharder; import org.broadinstitute.hellbender.engine.spark.datasources.VariantsSparkSink; import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.tools.walkers.annotator.Annotation; @@ -37,11 +41,19 @@ import org.broadinstitute.hellbender.utils.fasta.CachingIndexedFastaSequenceFile; import org.broadinstitute.hellbender.utils.io.IOUtils; import org.broadinstitute.hellbender.utils.read.GATKRead; +import scala.Tuple2; import java.io.IOException; import java.nio.file.Path; import java.util.*; import java.util.function.Supplier; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * ******************************************************************************** diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/bqsr/BaseRecalibratorEngineSparkWrapper.java b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/bqsr/BaseRecalibratorEngineSparkWrapper.java deleted file mode 100644 index fb3bb1dcec5..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/bqsr/BaseRecalibratorEngineSparkWrapper.java +++ /dev/null @@ -1,93 +0,0 @@ -package org.broadinstitute.hellbender.tools.spark.transforms.bqsr; - -import htsjdk.samtools.SAMFileHeader; -import htsjdk.samtools.SAMSequenceDictionary; -import org.apache.spark.broadcast.Broadcast; -import org.broadinstitute.hellbender.engine.*; -import org.broadinstitute.hellbender.utils.gcs.BucketUtils; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.read.ReadUtils; -import org.broadinstitute.hellbender.utils.recalibration.*; -import org.broadinstitute.hellbender.utils.recalibration.covariates.StandardCovariateList; -import org.broadinstitute.hellbender.utils.reference.ReferenceBases; -import org.broadinstitute.hellbender.utils.variant.GATKVariant; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.PrintStream; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; - -/** - * A lightweight wrapper over BaseRecalibrationEngine to make it easier to use from Spark. - * Takes in reads + contextual data (overlapping reference bases and variants), spits out RecalibrationTables. - */ -public final class BaseRecalibratorEngineSparkWrapper implements Serializable { - private static final long serialVersionUID = 1L; - - private Broadcast headerBcast; - private transient SAMFileHeader header; - private Broadcast referenceSequenceDictionaryBcast; - private transient SAMSequenceDictionary referenceSequenceDictionary; - - private RecalibrationArgumentCollection recalArgs; - - private BaseRecalibrationEngine recalibrationEngine; - - // true when we are about to process the first element of a bundle. - private transient boolean initialized = false; - - - // ------------------------------------------------------------- - // public functions (and constructors, those go first) - - /** - * Takes in reads + contextual data (overlapping reference bases and variants), spits out RecalibrationTables. - */ - public BaseRecalibratorEngineSparkWrapper(Broadcast headerBcast, Broadcast referenceSequenceDictionaryBcast, RecalibrationArgumentCollection recalArgs) { - this.headerBcast = headerBcast; - this.referenceSequenceDictionaryBcast = referenceSequenceDictionaryBcast; - this.recalArgs = recalArgs; - if (this.recalArgs.FORCE_PLATFORM != null) { - this.recalArgs.DEFAULT_PLATFORM = this.recalArgs.FORCE_PLATFORM; - } - } - - // saves to output - public static void saveTextualReport(String output, SAMFileHeader header, RecalibrationTables rt, RecalibrationArgumentCollection recalArgs) throws IOException { - OutputStream oStream = BucketUtils.createFile(output); - QuantizationInfo qi = new QuantizationInfo(rt, recalArgs.QUANTIZING_LEVELS); - if (recalArgs.FORCE_PLATFORM != null) { - recalArgs.DEFAULT_PLATFORM = recalArgs.FORCE_PLATFORM; - } - StandardCovariateList covariates = new StandardCovariateList(recalArgs, header); - try ( PrintStream reportStream = new PrintStream(oStream) ) { - RecalUtils.outputRecalibrationReport(reportStream, recalArgs, qi, rt, covariates); - } - } - - public Iterator apply(Iterator shards) throws Exception { - this.header = headerBcast.value(); - this.referenceSequenceDictionary = referenceSequenceDictionaryBcast.value(); - recalibrationEngine = new BaseRecalibrationEngine(recalArgs, header); - while (shards.hasNext()) { - ContextShard shard = shards.next(); - for (int i=0; i variants = rc.getOverlappingVariants(); - final ReferenceBases refBases = rc.getOverlappingReferenceBases(); - final ReferenceDataSource refDS = new ReferenceMemorySource(refBases, referenceSequenceDictionary); - recalibrationEngine.processRead(read, refDS, variants); - } - } - ArrayList ret = new ArrayList<>(); - ret.add(recalibrationEngine.getRecalibrationTables()); - return ret.iterator(); - } - -} diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/AddContextDataToReadSparkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/AddContextDataToReadSparkUnitTest.java deleted file mode 100644 index 2f7cd214a08..00000000000 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/AddContextDataToReadSparkUnitTest.java +++ /dev/null @@ -1,114 +0,0 @@ -package org.broadinstitute.hellbender.engine.spark; - -import com.google.common.collect.Lists; -import htsjdk.samtools.SAMRecord; -import htsjdk.samtools.SAMSequenceDictionary; -import htsjdk.samtools.SAMSequenceRecord; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.broadinstitute.hellbender.GATKBaseTest; -import org.broadinstitute.hellbender.engine.ReadContextData; -import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceMultiSparkSource; -import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceWindowFunctions; -import org.broadinstitute.hellbender.utils.KV; -import org.broadinstitute.hellbender.utils.SerializableFunction; -import org.broadinstitute.hellbender.utils.SimpleInterval; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.reference.ReferenceBases; -import org.broadinstitute.hellbender.testutils.FakeReferenceSource; -import org.broadinstitute.hellbender.utils.variant.GATKVariant; -import org.testng.Assert; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -public class AddContextDataToReadSparkUnitTest extends GATKBaseTest { - @DataProvider(name = "bases") - public Object[][] bases() { - List> classes = Collections.singletonList(SAMRecord.class); - JoinStrategy[] strategies = JoinStrategy.values(); - Object[][] data = new Object[classes.size() * strategies.length][]; - for (int i = 0; i < classes.size(); ++i) { - Class c = classes.get(i); - ReadsPreprocessingPipelineSparkTestData testData = new ReadsPreprocessingPipelineSparkTestData(c); - - List reads = testData.getReads(); - List variantList = testData.getVariants(); - List> expectedReadContextData = testData.getKvReadContextData(); - for (int j = 0; j < strategies.length; j++) { - data[i * strategies.length + j] = new Object[]{reads, variantList, expectedReadContextData, strategies[j]}; - } - } - return data; - } - - @Test(dataProvider = "bases", groups = "spark") - public void addContextDataTest(List reads, List variantList, - List> expectedReadContextData, - JoinStrategy joinStrategy) { - JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - - JavaRDD rddReads = ctx.parallelize(reads); - JavaRDD rddVariants = ctx.parallelize(variantList); - - SAMSequenceDictionary sd = new SAMSequenceDictionary(Lists.newArrayList(new SAMSequenceRecord("1", 100000), new SAMSequenceRecord("2", 100000))); - JavaPairRDD rddActual = AddContextDataToReadSpark.add(ctx, rddReads, - new TestMultiReferenceSource(sd), - rddVariants, null, joinStrategy, - sd, 10000, 1000); - - Map actual = rddActual.collectAsMap(); - - Assert.assertEquals(actual.size(), expectedReadContextData.size()); - for (KV kv : expectedReadContextData) { - ReadContextData readContextData = actual.get(kv.getKey()); - Assert.assertNotNull(readContextData); - Assert.assertTrue(CollectionUtils.isEqualCollection(Lists.newArrayList(readContextData.getOverlappingVariants()), - Lists.newArrayList(kv.getValue().getOverlappingVariants()))); - SimpleInterval minimalInterval = kv.getValue().getOverlappingReferenceBases().getInterval(); - ReferenceBases subset = readContextData.getOverlappingReferenceBases().getSubset(minimalInterval); - Assert.assertEquals(subset, kv.getValue().getOverlappingReferenceBases()); - } - } - - // Provide a fake implementation of this class for testing. We can't use a real mock since this is used as a Spark - // broadcast variable. Mocks are mutated when they're accessed, which can result in ConcurrentModificationExceptions - // during serialization/broadcast. - static class TestMultiReferenceSource extends ReferenceMultiSparkSource implements Serializable { - private static final long serialVersionUID = 1L; - - final SAMSequenceDictionary sequenceDictionary; - - public TestMultiReferenceSource(final SAMSequenceDictionary sd) { - sequenceDictionary = sd; - } - - @Override - public ReferenceBases getReferenceBases(final SimpleInterval interval) { - return FakeReferenceSource.bases(interval); - } - - @Override - public SAMSequenceDictionary getReferenceSequenceDictionary(final SAMSequenceDictionary optReadSequenceDictionaryToMatch) { - return sequenceDictionary; - } - - @Override - public boolean isCompatibleWithSparkBroadcast(){ - return true; - } - - @Override - public SerializableFunction getReferenceWindowFunction() { - return ReferenceWindowFunctions.IDENTITY_FUNCTION; - } - - } - -} \ No newline at end of file diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/JoinReadsWithRefBasesSparkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/JoinReadsWithRefBasesSparkUnitTest.java deleted file mode 100644 index 347c278ff00..00000000000 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/JoinReadsWithRefBasesSparkUnitTest.java +++ /dev/null @@ -1,88 +0,0 @@ -package org.broadinstitute.hellbender.engine.spark; - -import htsjdk.samtools.SAMRecord; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.broadinstitute.hellbender.GATKBaseTest; -import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceMultiSparkSource; -import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceWindowFunctions; -import org.broadinstitute.hellbender.utils.KV; -import org.broadinstitute.hellbender.utils.SimpleInterval; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.reference.ReferenceBases; -import org.broadinstitute.hellbender.testutils.FakeReferenceSource; -import org.testng.Assert; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.mockito.Mockito.*; - -public class JoinReadsWithRefBasesSparkUnitTest extends GATKBaseTest { - @DataProvider(name = "bases") - public Object[][] bases(){ - List> classes = Collections.singletonList(SAMRecord.class); - Object[][] data = new Object[classes.size()][]; - for (int i = 0; i < classes.size(); ++i) { - Class c = classes.get(i); - ReadsPreprocessingPipelineSparkTestData testData = new ReadsPreprocessingPipelineSparkTestData(c); - - List reads = testData.getReads(); - List intervals = testData.getAllIntervals(); - List> kvReadRefBases = testData.getKvReadsRefBases(); - data[i] = new Object[]{reads, kvReadRefBases, intervals}; - } - return data; - } - - @Test(dataProvider = "bases", groups = "spark") - public void refBasesShuffleTest(List reads, List> kvReadRefBases, - List intervals) throws IOException { - JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - - JavaRDD rddReads = ctx.parallelize(reads); - - ReferenceMultiSparkSource mockSource = mock(ReferenceMultiSparkSource.class, withSettings().serializable()); - for (SimpleInterval i : intervals) { - when(mockSource.getReferenceBases(eq(i))).thenReturn(FakeReferenceSource.bases(i)); - } - when(mockSource.getReferenceWindowFunction()).thenReturn(ReferenceWindowFunctions.IDENTITY_FUNCTION); - - JavaPairRDD rddResult = ShuffleJoinReadsWithRefBases.addBases(mockSource, rddReads); - Map result = rddResult.collectAsMap(); - - for (KV kv : kvReadRefBases) { - ReferenceBases referenceBases = result.get(kv.getKey()); - Assert.assertNotNull(referenceBases); - Assert.assertEquals(kv.getValue(),referenceBases); - } - } - - @Test(dataProvider = "bases", groups = "spark") - public void refBasesBroadcastTest(List reads, List> kvReadRefBases, - List intervals) throws IOException { - JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - - JavaRDD rddReads = ctx.parallelize(reads); - - ReferenceMultiSparkSource mockSource = mock(ReferenceMultiSparkSource.class, withSettings().serializable()); - for (SimpleInterval i : intervals) { - when(mockSource.getReferenceBases(eq(i))).thenReturn(FakeReferenceSource.bases(i)); - } - when(mockSource.getReferenceWindowFunction()).thenReturn(ReferenceWindowFunctions.IDENTITY_FUNCTION); - - JavaPairRDD rddResult = BroadcastJoinReadsWithRefBases.addBases(mockSource, rddReads); - Map result = rddResult.collectAsMap(); - - for (KV kv : kvReadRefBases) { - ReferenceBases referenceBases = result.get(kv.getKey()); - Assert.assertNotNull(referenceBases); - Assert.assertEquals(kv.getValue(),referenceBases); - } - } -} \ No newline at end of file diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/JoinReadsWithVariantsSparkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/JoinReadsWithVariantsSparkUnitTest.java deleted file mode 100644 index 45de109dcc9..00000000000 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/JoinReadsWithVariantsSparkUnitTest.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.broadinstitute.hellbender.engine.spark; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import htsjdk.samtools.SAMRecord; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.broadinstitute.hellbender.GATKBaseTest; -import org.broadinstitute.hellbender.utils.KV; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.variant.GATKVariant; -import org.testng.Assert; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.util.*; - -public class JoinReadsWithVariantsSparkUnitTest extends GATKBaseTest { - @DataProvider(name = "pairedReadsAndVariants") - public Object[][] pairedReadsAndVariants(){ - List testCases = new ArrayList<>(); - - for ( JoinStrategy joinStrategy : JoinStrategy.values() ) { - for ( Class readImplementation : Collections.singletonList(SAMRecord.class)) { - ReadsPreprocessingPipelineSparkTestData testData = new ReadsPreprocessingPipelineSparkTestData(readImplementation); - List reads = testData.getReads(); - List variantList = testData.getVariants(); - List>> kvReadiVariant = testData.getKvReadiVariant(); - - testCases.add(new Object[]{reads, variantList, kvReadiVariant, joinStrategy}); - } - } - return testCases.toArray(new Object[][]{}); - } - - @Test(dataProvider = "pairedReadsAndVariants", groups = "spark") - public void pairReadsAndVariantsTest(List reads, List variantList, List>> kvReadiVariant, JoinStrategy joinStrategy) { - JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - - JavaRDD rddReads = ctx.parallelize(reads); - JavaRDD rddVariants = ctx.parallelize(variantList); - JavaPairRDD> actual = joinStrategy == JoinStrategy.SHUFFLE ? - ShuffleJoinReadsWithVariants.join(rddReads, rddVariants) : - BroadcastJoinReadsWithVariants.join(rddReads, rddVariants); - Map> gatkReadIterableMap = actual.collectAsMap(); - - Assert.assertEquals(gatkReadIterableMap.size(), kvReadiVariant.size()); - for (KV> kv : kvReadiVariant) { - List variants = Lists.newArrayList(gatkReadIterableMap.get(kv.getKey())); - Assert.assertTrue(variants.stream().noneMatch( v -> v == null)); - HashSet hashVariants = new LinkedHashSet<>(variants); - final Iterable iVariants = kv.getValue(); - HashSet expectedHashVariants = Sets.newLinkedHashSet(iVariants); - Assert.assertEquals(hashVariants, expectedHashVariants); - } - } -} \ No newline at end of file diff --git a/src/test/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSparkIntegrationTest.java b/src/test/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSparkIntegrationTest.java index 2c07cb1e1ae..445bdc6db3c 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSparkIntegrationTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSparkIntegrationTest.java @@ -21,7 +21,6 @@ import org.testng.annotations.Test; import java.io.File; -import java.io.IOException; import java.util.Collections; import org.broadinstitute.hellbender.tools.walkers.haplotypecaller.HaplotypeCallerIntegrationTest;