Skip to content

Commit

Permalink
Calculating coverage for each input bam in parallel on the thread poo…
Browse files Browse the repository at this point in the history
…l instead of on the main thread. This significantly improves variant annotation speed for large jobs with many input files.
  • Loading branch information
d-cameron committed Jan 20, 2017
1 parent 3baf198 commit 367498e
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 93 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>au.edu.wehi</groupId>
<artifactId>gridss</artifactId>
<packaging>jar</packaging>
<version>1.0.4.8-SNAPSHOT</version>
<version>1.1.0</version>
<name>gridss</name>
<url>https://github.com/PapenfussLab/gridss</url>
<properties>
Expand Down

This file was deleted.

9 changes: 3 additions & 6 deletions src/main/java/au/edu/wehi/idsv/ReferenceCoverageLookup.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@ public interface ReferenceCoverageLookup {
* @param position position immediate before putative breakend
* @return number of reads spanning the putative breakend
*/
public abstract int readsSupportingNoBreakendAfter(int referenceIndex,
int position);

int readsSupportingNoBreakendAfter(int referenceIndex, int position);
/**
* Number of read pairs providing evidence against a breakend immediately after the given base
* @param referenceIndex contig
* @param position position immediate before putative breakend
* @return number of read pairs spanning the putative breakend
*/
public abstract int readPairsSupportingNoBreakendAfter(int referenceIndex,
int position);

int readPairsSupportingNoBreakendAfter(int referenceIndex, int position);
int getCategory();
}
2 changes: 1 addition & 1 deletion src/main/java/au/edu/wehi/idsv/SAMEvidenceSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public static CloseableIterator<DirectedEvidence> mergedIterator(final List<SAME
/**
* Maximum distance between the SAM alignment location of evidence, and the extrema of the
* breakend position supported by that evidence.
* @return maximum order-of-order distance between evidence ordered by SAM alignment position and the breakend start position
* @return maximum out-of-order distance between evidence ordered by SAM alignment position and the breakend start position
*/
public static int maximumWindowSize(ProcessingContext context, List<SAMEvidenceSource> sources, AssemblyEvidenceSource assembly) {
int maxSize = 0;
Expand Down
116 changes: 68 additions & 48 deletions src/main/java/au/edu/wehi/idsv/SequentialCoverageAnnotator.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

import com.google.common.collect.Lists;

import au.edu.wehi.idsv.util.AsyncBufferedIterator;
import htsjdk.samtools.SAMFileHeader.SortOrder;
import htsjdk.samtools.SAMRecord;
Expand All @@ -23,6 +24,7 @@
/**
* Annotates breakends with reference allele coverage information
*
*
* @author Daniel Cameron
*
*/
Expand All @@ -32,69 +34,87 @@ public class SequentialCoverageAnnotator<T extends VariantContextDirectedEvidenc
private final List<ReferenceCoverageLookup> reference;
private final Iterator<T> it;
private final List<Closeable> toclose = new ArrayList<>();
public SequentialCoverageAnnotator(ProcessingContext context, List<SAMEvidenceSource> sources, Iterator<T> it) {
int windowSize = SAMEvidenceSource.maximumWindowSize(context, sources, null);
List<List<SAMEvidenceSource>> byCategory = Lists.newArrayList();
while (byCategory.size() < context.getCategoryCount()) {
byCategory.add(new ArrayList<SAMEvidenceSource>());
}
for (SAMEvidenceSource s : sources) {
byCategory.get(s.getSourceCategory()).add(s);
}
List<ReferenceCoverageLookup> coverage = Lists.newArrayList();
for (int i = 0; i < byCategory.size(); i++) {
List<ReferenceCoverageLookup> lookup = Lists.newArrayList();
for (SAMEvidenceSource s : byCategory.get(i)) {
// one read-ahead thread per input file
SamReader reader = SamReaderFactory.makeDefault().open(s.getFile());
SAMRecordIterator rawIterator = reader.iterator();
rawIterator.assertSorted(SortOrder.coordinate);
CloseableIterator<SAMRecord> sit = new AsyncBufferedIterator<SAMRecord>(rawIterator, s.getFile().getName() + "-Coverage");
toclose.add(sit); // close the async iterator first to prevent aysnc reading from a closed stream
toclose.add(rawIterator);
toclose.add(reader);
sit = new ProgressLoggingSAMRecordIterator(sit, new ProgressLogger(log));
SequentialReferenceCoverageLookup sourceLookup = new SequentialReferenceCoverageLookup(sit, s.getMetrics().getIdsvMetrics(), s.getReadPairConcordanceCalculator(), windowSize);
context.registerBuffer(s.getFile().getName(), sourceLookup);
lookup.add(sourceLookup);
}
coverage.add(new AggregateReferenceCoverageLookup(lookup));
}
private final ExecutorService threadpool;
public SequentialCoverageAnnotator(ProcessingContext context, List<SAMEvidenceSource> sources, Iterator<T> it, ExecutorService threadpool) {
this.context = context;
this.reference = coverage;
this.reference = createLookup(context, sources);
this.it = it;

this.threadpool = threadpool;
}
private List<ReferenceCoverageLookup> createLookup(ProcessingContext context, List<SAMEvidenceSource> sources) {
int windowSize = SAMEvidenceSource.maximumWindowSize(context, sources, null);
List<ReferenceCoverageLookup> result = new ArrayList<>();
for (SAMEvidenceSource ses : sources) {
assert(ses.getSourceCategory() >= 0);
assert(ses.getSourceCategory() < context.getCategoryCount());
// one read-ahead thread per input file
SamReader reader = SamReaderFactory.makeDefault().open(ses.getFile());
SAMRecordIterator rawIterator = reader.iterator();
rawIterator.assertSorted(SortOrder.coordinate);
CloseableIterator<SAMRecord> sit = new AsyncBufferedIterator<SAMRecord>(rawIterator, ses.getFile().getName() + "-Coverage");
toclose.add(sit); // close the async iterator first to prevent aysnc reading from a closed stream
toclose.add(rawIterator);
toclose.add(reader);
sit = new ProgressLoggingSAMRecordIterator(sit, new ProgressLogger(log));
SequentialReferenceCoverageLookup sourceLookup = new SequentialReferenceCoverageLookup(sit, ses.getMetrics().getIdsvMetrics(), ses.getReadPairConcordanceCalculator(), windowSize, ses.getSourceCategory());
context.registerBuffer(ses.getFile().getName(), sourceLookup);
result.add(sourceLookup);
}
return result;
}
public SequentialCoverageAnnotator(
ProcessingContext context,
Iterator<T> it,
List<ReferenceCoverageLookup> reference) {
List<ReferenceCoverageLookup> reference,
ExecutorService threadpool) {
this.it = it;
this.context = context;
this.reference = reference;
this.threadpool = threadpool;
}
private static class CoverageResult {
public CoverageResult(int reads, int spans) {
this.readsSupportingNoBreakendAfter = reads;
this.readPairsSupportingNoBreakendAfter = spans;
}
public final int readsSupportingNoBreakendAfter;
public final int readPairsSupportingNoBreakendAfter;
}
private static CoverageResult calculateCoverage(ReferenceCoverageLookup lookup, int referenceIndex, int start, int end) {
int reads = IntStream.range(start, end)
.map(p -> lookup.readsSupportingNoBreakendAfter(referenceIndex, p))
.min().getAsInt();
int spans = IntStream.range(start, end)
.map(p -> lookup.readPairsSupportingNoBreakendAfter(referenceIndex, p))
.min().getAsInt();
return new CoverageResult(reads, spans);
}
@SuppressWarnings("unchecked")
public T annotate(T variant) {
BreakendSummary loc = variant.getBreakendSummary();
int referenceIndex = loc.referenceIndex;
int offset = loc.direction == BreakendDirection.Forward ? 0 : -1;
int[] reads = new int[reference.size()];
int[] spans = new int[reference.size()];
for (int i = 0; i < reference.size(); i++) {
ReferenceCoverageLookup r = reference.get(i);
if (r != null) {
reads[i] = IntStream.range(loc.start + offset, loc.end + 1 + offset)
.map(p -> r.readsSupportingNoBreakendAfter(referenceIndex, p))
.min().getAsInt();
spans[i] = IntStream.range(loc.start + offset, loc.end + 1 + offset)
.map(p -> r.readPairsSupportingNoBreakendAfter(referenceIndex, p))
.min().getAsInt();
int start = loc.start + offset;
int end = loc.end + 1 + offset;
List<Future<CoverageResult>> tasks = new ArrayList<>();
for (ReferenceCoverageLookup rcl : reference) {
tasks.add(threadpool.submit(() -> calculateCoverage(rcl, referenceIndex, start, end)));
}
try {
int[] reads = new int[reference.size()];
int[] spans = new int[reference.size()];
for (int i = 0; i < reference.size(); i++) {
ReferenceCoverageLookup rcl = reference.get(i);
reads[rcl.getCategory()] += tasks.get(i).get().readsSupportingNoBreakendAfter;
spans[rcl.getCategory()] += tasks.get(i).get().readPairsSupportingNoBreakendAfter;
}
IdsvVariantContextBuilder builder = new IdsvVariantContextBuilder(context, variant);
builder.referenceReads(reads);
builder.referenceSpanningPairs(spans);
return (T)builder.make();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
IdsvVariantContextBuilder builder = new IdsvVariantContextBuilder(context, variant);
builder.referenceReads(reads);
builder.referenceSpanningPairs(spans);
return (T)builder.make();
}
@Override
public boolean hasNext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import htsjdk.samtools.filter.AggregateFilter;
import htsjdk.samtools.filter.AlignedFilter;
import htsjdk.samtools.filter.DuplicateReadFilter;
import htsjdk.samtools.filter.FilteringIterator;
import htsjdk.samtools.filter.FilteringSamIterator;

/**
* Counts the number of reads and read pairs providing support for the
Expand All @@ -28,6 +28,7 @@
*
*/
public class SequentialReferenceCoverageLookup implements Closeable, ReferenceCoverageLookup, TrackedBuffer {
private final int category;
private final List<Closeable> toClose = Lists.newArrayList();
private final PeekingIterator<SAMRecord> reads;
private final ReadPairConcordanceCalculator pairing;
Expand All @@ -54,12 +55,13 @@ public class SequentialReferenceCoverageLookup implements Closeable, ReferenceCo
* @param maxFragmentSize maximum fragment
* @param reads reads to process. <b>Must</b> be coordinate sorted
*/
public SequentialReferenceCoverageLookup(Iterator<SAMRecord> it, IdsvMetrics metrics, ReadPairConcordanceCalculator pairing, int windowSize) {
public SequentialReferenceCoverageLookup(Iterator<SAMRecord> it, IdsvMetrics metrics, ReadPairConcordanceCalculator pairing, int windowSize, int category) {
this.pairing = pairing;
if (it instanceof Closeable) toClose.add((Closeable)it);
this.reads = Iterators.peekingIterator(new FilteringIterator(it, new AggregateFilter(ImmutableList.of(new AlignedFilter(true), new DuplicateReadFilter()))));
this.reads = Iterators.peekingIterator(new FilteringSamIterator(it, new AggregateFilter(ImmutableList.of(new AlignedFilter(true), new DuplicateReadFilter()))));
this.largestWindow = windowSize;
this.maxEvidenceWindow = Math.max(metrics.MAX_READ_LENGTH, Math.max(metrics.MAX_READ_MAPPED_LENGTH, pairing != null ? pairing.maxConcordantFragmentSize() : 0));
this.category = category;
}
public void close() {
for (Closeable c : toClose) {
Expand Down Expand Up @@ -202,4 +204,8 @@ public List<NamedTrackedBuffer> currentTrackedBufferSizes() {
new NamedTrackedBuffer(trackedBufferName_currentEndReferencePairs, currentEndReferencePairs.size())
);
}
@Override
public int getCategory() {
return category;
}
}
2 changes: 1 addition & 1 deletion src/main/java/gridss/AnnotateReferenceCoverage.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
public class AnnotateReferenceCoverage extends VcfTransformCommandLineProgram {
@Override
public CloseableIterator<VariantContextDirectedBreakpoint> iterator(CloseableIterator<VariantContextDirectedBreakpoint> calls, ExecutorService threadpool) {
return new SequentialCoverageAnnotator<VariantContextDirectedBreakpoint>(getContext(), getSamEvidenceSources(), calls);
return new SequentialCoverageAnnotator<VariantContextDirectedBreakpoint>(getContext(), getSamEvidenceSources(), calls, threadpool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;

import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.SAMRecordCoordinateComparator;



public class SequentialCoverageAnnotatorTest extends TestHelper {
@SuppressWarnings("resource")
public VariantContextDirectedEvidence go(List<SAMRecord> ref, VariantContextDirectedEvidence toAnnotate) {
Collections.sort(ref, new SAMRecordCoordinateComparator());
return new SequentialCoverageAnnotator(
return new SequentialCoverageAnnotator<VariantContextDirectedEvidence>(
getContext(),
ImmutableList.of(toAnnotate).iterator(),
Lists.<ReferenceCoverageLookup>newArrayList(new SequentialReferenceCoverageLookup(ref.iterator(), IDSV(ref), new SAMFlagReadPairConcordanceCalculator(IDSV(ref)), 1024)))
Lists.<ReferenceCoverageLookup>newArrayList(new SequentialReferenceCoverageLookup(ref.iterator(), IDSV(ref), new SAMFlagReadPairConcordanceCalculator(IDSV(ref)), 1024, 0)),
MoreExecutors.newDirectExecutorService())
.annotate(toAnnotate);
}
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

Expand All @@ -14,7 +15,12 @@
public class SequentialReferenceCoverageLookupTest extends TestHelper {
public ReferenceCoverageLookup init(List<SAMRecord> reads, int windowSize) {
Collections.sort(reads, new SAMRecordCoordinateComparator());
return new SequentialReferenceCoverageLookup(reads.iterator(), IDSV(reads), new SAMFlagReadPairConcordanceCalculator(IDSV(reads)), windowSize);
return new SequentialReferenceCoverageLookup(reads.iterator(), IDSV(reads), new SAMFlagReadPairConcordanceCalculator(IDSV(reads)), windowSize, 5);
}
@Test
public void should_report_correct_category() {
ReferenceCoverageLookup lookup = init(new ArrayList<SAMRecord>(), 1);
assertEquals(5, lookup.getCategory());
}
@Test
public void readsPairsSupportingNoBreakendAfter_should_return_first_read_in_pair() {
Expand Down

0 comments on commit 367498e

Please sign in to comment.