Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bypass FeatureReader for GenomicsDBImport #7393

Merged
merged 4 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import htsjdk.samtools.util.Interval;
import htsjdk.samtools.util.IntervalList;
import htsjdk.samtools.util.Locatable;
import htsjdk.samtools.util.FileExtensions;
import htsjdk.tribble.AbstractFeatureReader;
import htsjdk.tribble.CloseableTribbleIterator;
import htsjdk.tribble.FeatureReader;
Expand Down Expand Up @@ -202,6 +203,7 @@ public final class GenomicsDBImport extends GATKTool {
public static final String VCF_INITIALIZER_THREADS_LONG_NAME = "reader-threads";
public static final String MAX_NUM_INTERVALS_TO_IMPORT_IN_PARALLEL = "max-num-intervals-to-import-in-parallel";
public static final String MERGE_CONTIGS_INTO_NUM_PARTITIONS = "merge-contigs-into-num-partitions";
public static final String BYPASS_FEATURE_READER = "bypass-feature-reader";
public static final int INTERVAL_LIST_SIZE_WARNING_THRESHOLD = 100;
public static final int ARRAY_COLUMN_BOUNDS_START = 0;
public static final int ARRAY_COLUMN_BOUNDS_END = 1;
Expand Down Expand Up @@ -348,6 +350,13 @@ public final class GenomicsDBImport extends GATKTool {
optional = true)
private boolean sharedPosixFSOptimizations = false;

@Argument(fullName = BYPASS_FEATURE_READER,
doc = "Use htslib to read input VCFs instead of GATK's FeatureReader. This will reduce memory usage and potentially speed up " +
"the import. Lower memory requirements may also enable parallelism through " + MAX_NUM_INTERVALS_TO_IMPORT_IN_PARALLEL +
". To enable this option, VCFs must be normalized, block-compressed and indexed.",
optional = true)
private boolean bypassFeatureReader = false;

@Argument(fullName = USE_GCS_HDFS_CONNECTOR,
doc = "Use the GCS HDFS Connector instead of the native GCS SDK client with gs:// URLs.",
optional = true)
Expand Down Expand Up @@ -502,6 +511,15 @@ private static void assertIntervalsCoverEntireContigs(GenomicsDBImporter importe
}
}

private static void assertVariantFileIsCompressedAndIndexed(final Path path) {
if (!path.toString().toLowerCase().endsWith(FileExtensions.COMPRESSED_VCF)) {
throw new UserException("Input variant files must be block compressed vcfs when using " +
BYPASS_FEATURE_READER + ", but " + path.toString() + " does not appear to be");
}
Path indexPath = path.resolveSibling(path.getFileName() + FileExtensions.COMPRESSED_VCF_INDEX);
IOUtils.assertFileIsReadable(indexPath);
}

/**
* sets the values of mergedHeaderLines, mergedHeaderSequenceDictionary, and sampleNameToVcfPath
*/
Expand All @@ -512,6 +530,9 @@ private void initializeHeaderAndSampleMappings() {
final List<VCFHeader> headers = new ArrayList<>(variantPaths.size());
for (final String variantPathString : variantPaths) {
final Path variantPath = IOUtils.getPath(variantPathString);
if (bypassFeatureReader) {
assertVariantFileIsCompressedAndIndexed(variantPath);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your testing, did you find that these extra checks for whether the inputs are block-compressed and indexed added significantly to the runtime when dealing with remote files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't done a lot of remote testing -- just sanity tests to ensure that they work. In the small remote cases we've tried the native reader is actually slower, but I haven't dug into it to see where the bottleneck is (potentially tweaking buffer sizes, etc). As I mentioned in the PR, that is something we were hoping to explore with Broad.

}
final VCFHeader header = getHeaderFromPath(variantPath);
Utils.validate(header != null, "Null header was found in " + variantPath + ".");
assertGVCFHasOnlyOneSample(variantPathString, header);
Expand Down Expand Up @@ -539,7 +560,7 @@ private void initializeHeaderAndSampleMappings() {
//it's VERY IMPORTANT that this map is Sorted according to String's natural ordering, if it is not
//the resulting database will have incorrect sample names
//see https://github.com/broadinstitute/gatk/issues/3682 for more information
sampleNameToVcfPath = loadSampleNameMapFileInSortedOrder(IOUtils.getPath(sampleNameMapFile));
sampleNameToVcfPath = loadSampleNameMapFileInSortedOrder(IOUtils.getPath(sampleNameMapFile), bypassFeatureReader);
final Path firstHeaderPath = IOUtils.getPath(sampleNameToVcfPath.entrySet().iterator().next().getValue().toString());
final VCFHeader header = getHeaderFromPath(firstHeaderPath);
//getMetaDataInInputOrder() returns an ImmutableSet - LinkedHashSet is mutable and preserves ordering
Expand Down Expand Up @@ -607,6 +628,11 @@ private static void assertGVCFHasOnlyOneSample(final String variantPath, final V
*/
@VisibleForTesting
static LinkedHashMap<String, URI> loadSampleNameMapFile(final Path sampleToFileMapPath) {
return loadSampleNameMapFile(sampleToFileMapPath, false);
}

private static LinkedHashMap<String, URI> loadSampleNameMapFile(final Path sampleToFileMapPath,
final boolean checkVcfIsCompressedAndIndexed) {
try {
final List<String> lines = Files.readAllLines(sampleToFileMapPath);
if (lines.isEmpty()) {
Expand All @@ -631,6 +657,9 @@ static LinkedHashMap<String, URI> loadSampleNameMapFile(final Path sampleToFileM
if (oldPath != null){
throw new UserException.BadInput("Found two mappings for the same sample: " + sample + "\n" + path + "\n" + oldPath );
}
if (checkVcfIsCompressedAndIndexed) {
assertVariantFileIsCompressedAndIndexed(IOUtils.getPath(path));
}
}
catch(final URISyntaxException e) {
throw new UserException("Malformed URI "+e.toString());
Expand All @@ -652,10 +681,12 @@ static LinkedHashMap<String, URI> loadSampleNameMapFile(final Path sampleToFileM
*
* The sample names must be unique.
* @param sampleToFileMapPath path to the mapping file
* @param checkVcfIsCompressedAndIndexed boolean indicating whether to check vcf is compressed and indexed
* @return map of sample name to corresponding file, sorted by sample name
*/
public static SortedMap<String, URI> loadSampleNameMapFileInSortedOrder(final Path sampleToFileMapPath){
return new TreeMap<>(loadSampleNameMapFile(sampleToFileMapPath));
public static SortedMap<String, URI> loadSampleNameMapFileInSortedOrder(final Path sampleToFileMapPath,
final boolean checkVcfIsCompressedAndIndexed){
return new TreeMap<>(loadSampleNameMapFile(sampleToFileMapPath, checkVcfIsCompressedAndIndexed));
}

/**
Expand Down Expand Up @@ -831,7 +862,8 @@ private ImportConfig createImportConfig(final int batchSize) {
importConfigurationBuilder.setConsolidateTiledbArrayAfterLoad(doConsolidation);
importConfigurationBuilder.setEnableSharedPosixfsOptimizations(sharedPosixFSOptimizations);
ImportConfig importConfig = new ImportConfig(importConfigurationBuilder.build(), validateSampleToReaderMap, true,
batchSize, mergedHeaderLines, sampleNameToVcfPath, this::createSampleToReaderMap, doIncrementalImport);
batchSize, mergedHeaderLines, sampleNameToVcfPath, bypassFeatureReader ? null : this::createSampleToReaderMap,
doIncrementalImport);
importConfig.setOutputCallsetmapJsonFile(callsetMapJSONFile);
importConfig.setOutputVidmapJsonFile(vidMapJSONFile);
importConfig.setOutputVcfHeaderFile(vcfHeaderFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ public void testGenomicsDBImportFileInputs() throws IOException {
testGenomicsDBImporter(LOCAL_GVCFS, INTERVAL, COMBINED, b38_reference_20_21, true, 1);
}

@Test
public void testGenomicsDBImportFileInputsNativeReader() throws IOException {
testGenomicsDBImporter(LOCAL_GVCFS, INTERVAL, COMBINED, b38_reference_20_21, true, 1, true);
}

@Test
public void testGenomicsDBImportFileInputs_newMQ() throws IOException {
testGenomicsDBImporter_newMQ(GVCFS_WITH_NEW_MQ, INTERVAL2, COMBINED_WITH_NEW_MQ, b37_reference_20_21, true, Collections.emptyList());
Expand All @@ -213,6 +218,11 @@ public void testGenomicsDBImportFileInputsWithMultipleIntervals() throws IOExcep
testGenomicsDBImporter(LOCAL_GVCFS, MULTIPLE_INTERVALS, COMBINED_MULTI_INTERVAL, b38_reference_20_21, true, 1);
}

@Test
public void testGenomicsDBImportFileInputsWithMultipleIntervalsNativeReader() throws IOException {
testGenomicsDBImporter(LOCAL_GVCFS, MULTIPLE_INTERVALS, COMBINED_MULTI_INTERVAL, b38_reference_20_21, true, 1, true);
}

@Test(timeOut = 1000000)
public void testGenomicsDBImportWith1000IntervalsToBeMerged() throws IOException {
final String workspace = createTempDir("genomicsdb-tests-").getAbsolutePath() + "/workspace";
Expand All @@ -235,6 +245,11 @@ public void testGenomicsDBImportFileInputsAgainstCombineGVCFMergeContigsToSingle
testGenomicsDBAgainstCombineGVCFs(LOCAL_GVCFS, INTERVAL_20_21, b38_reference_20_21, new String[0], 1, 1, false);
}

@Test
public void testGenomicsDBImportFileInputsAgainstCombineGVCFNativeReader() throws IOException {
testGenomicsDBAgainstCombineGVCFs(LOCAL_GVCFS, INTERVAL, b38_reference_20_21, new String[0], 1, 0, true);
}

@Test
public void testGenomicsDBImportMergeContigsManyNonAdjacentContigsToSeveralContigs() throws IOException {
List<SimpleInterval> manyContigs = MANY_CONTIGS_NON_ADJACENT_INTERVALS.stream().map(SimpleInterval::new).collect(Collectors.toList());
Expand All @@ -256,18 +271,35 @@ public void testGenomicsDBImportFileInputsAgainstCombineGVCFWithMultipleInterval
testGenomicsDBAgainstCombineGVCFs(LOCAL_GVCFS, MULTIPLE_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS, b38_reference_20_21, new String[0]);
}

@Test
public void testGenomicsDBImportFileInputsAgainstCombineGVCFWithMultipleIntervalsNativeReader() throws IOException {
testGenomicsDBAgainstCombineGVCFs(LOCAL_GVCFS, MULTIPLE_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS, b38_reference_20_21, new String[0], 1, 0, true);
}

@Test
public void testGenomicsDBImportFileInputsAgainstCombineGVCFWithMultipleIntervalsWithMultipleThreads() throws IOException {
testGenomicsDBAgainstCombineGVCFs(LOCAL_GVCFS, MULTIPLE_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS, b38_reference_20_21,
new String[0], 4);
}

@Test
public void testGenomicsDBImportFileInputsAgainstCombineGVCFWithMultipleIntervalsWithMultipleThreadsNativeReader() throws IOException {
testGenomicsDBAgainstCombineGVCFs(LOCAL_GVCFS, MULTIPLE_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS, b38_reference_20_21,
new String[0], 4, 0, true);
}

@Test
public void testGenomicsDBImportFileInputsAgainstCombineGVCFWithMultipleNonAdjacentIntervals() throws IOException {
testGenomicsDBAgainstCombineGVCFs(LOCAL_GVCFS, MULTIPLE_NON_ADJACENT_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS,
b38_reference_20_21, new String[0]);
}

@Test
public void testGenomicsDBImportFileInputsAgainstCombineGVCFWithMultipleNonAdjacentIntervalsNativeReader() throws IOException {
testGenomicsDBAgainstCombineGVCFs(LOCAL_GVCFS, MULTIPLE_NON_ADJACENT_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS,
b38_reference_20_21, new String[0], 1, 0, true);
}

@Test
public void testGenomicsDBImportFileInputsAgainstCombineGVCFWithMultipleNonAdjacentIntervalsForFilesProducedAfterCombineGVCFs()
throws IOException {
Expand All @@ -276,6 +308,14 @@ public void testGenomicsDBImportFileInputsAgainstCombineGVCFWithMultipleNonAdjac
b38_reference_20_21, new String[0]);
}

@Test
public void testGenomicsDBImportFileInputsAgainstCombineGVCFWithMultipleNonAdjacentIntervalsForFilesProducedAfterCombineGVCFsNativeReader()
throws IOException {
//this test covers the scenario where the input vcfs have spanning deletions
testGenomicsDBAgainstCombineGVCFs(LOCAL_GVCFS_AFTER_COMBINE_GVCFS, MULTIPLE_NON_ADJACENT_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS,
b38_reference_20_21, new String[0], 1, 0, true);
}

@Test
public void testGenomicsDBImportFileInputsAgainstCombineGVCFWithNonDiploidData() throws IOException {
testGenomicsDBImporterWithGenotypes(Arrays.asList(NA12878_HG37, MULTIPLOID_DATA_HG37), INTERVAL_NONDIPLOID,
Expand Down Expand Up @@ -307,6 +347,12 @@ public void testGenomicsDBThreeLargeSamplesWithGenotypes() throws IOException {
testGenomicsDBImporterWithGenotypes(LOCAL_GVCFS, intervals, COMBINED_WITH_GENOTYPES, b38_reference_20_21, true, true, false);
}

@Test
public void testGenomicsDBThreeLargeSamplesWithGenotypesNativeReader() throws IOException {
ArrayList<SimpleInterval> intervals = new ArrayList<SimpleInterval>(Arrays.asList(new SimpleInterval("chr20", 1, 64444167)));
testGenomicsDBImporterWithGenotypes(LOCAL_GVCFS, intervals, COMBINED_WITH_GENOTYPES, b38_reference_20_21, true, true, false, true);
}

@Test
public void testGenomicsDBThreeLargeSamplesSitesOnlyQuery() throws IOException {
ArrayList<SimpleInterval> intervals = new ArrayList<SimpleInterval>(Arrays.asList(
Expand Down Expand Up @@ -504,11 +550,21 @@ public void testGenomicsDBImportFileInputsInBatchesWithMultipleIntervals(final i
testGenomicsDBImporterWithBatchSize(LOCAL_GVCFS, MULTIPLE_INTERVALS, COMBINED_MULTI_INTERVAL, batchSize);
}

@Test(dataProvider = "batchSizes")
public void testGenomicsDBImportFileInputsInBatchesWithMultipleIntervalsNativeReader(final int batchSize) throws IOException {
testGenomicsDBImporterWithBatchSize(LOCAL_GVCFS, MULTIPLE_INTERVALS, COMBINED_MULTI_INTERVAL, batchSize, true);
}

@Test(groups = {"bucket"}, dataProvider = "batchSizes")
public void testGenomicsDBImportGCSInputsInBatches(final int batchSize) throws IOException {
testGenomicsDBImporterWithBatchSize(resolveLargeFilesAsCloudURIs(LOCAL_GVCFS), INTERVAL, COMBINED, batchSize);
}

@Test(groups = {"bucket"}, dataProvider = "batchSizes")
public void testGenomicsDBImportGCSInputsInBatchesNativeReader(final int batchSize) throws IOException {
testGenomicsDBImporterWithBatchSize(resolveLargeFilesAsCloudURIs(LOCAL_GVCFS), INTERVAL, COMBINED, batchSize, true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testGenomicsDBImporterWithBatchSize() does not propagate the useNativeReader boolean correctly into writeToGenomicsDB()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

@DataProvider
public Object[][] getThreads(){
return new Object[][] {
Expand Down Expand Up @@ -581,7 +637,7 @@ private void testGenomicsDBImporterWithBatchSize(final List<String> vcfInputs, f
final boolean useNativeReader) throws IOException {
final String workspace = createTempDir("genomicsdb-batchsize-tests-").getAbsolutePath() + "/workspace-" + batchSize;

writeToGenomicsDB(vcfInputs, intervals, workspace, batchSize, false, 0, 1, false, false, false, 0, true);
writeToGenomicsDB(vcfInputs, intervals, workspace, batchSize, false, 0, 1, false, false, false, 0, useNativeReader);
checkJSONFilesAreWritten(workspace);
checkGenomicsDBAgainstExpected(workspace, intervals, expectedCombinedVCF, b38_reference_20_21, true, ATTRIBUTES_TO_IGNORE);
}
Expand Down Expand Up @@ -632,6 +688,7 @@ private void writeToGenomicsDB(final List<String> vcfInputs, final List<SimpleIn
if (chrsToPartitions != 0) {
args.add(GenomicsDBImport.MERGE_CONTIGS_INTO_NUM_PARTITIONS, String.valueOf(chrsToPartitions));
}
args.add(GenomicsDBImport.BYPASS_FEATURE_READER, useNativeReader);
if (useBufferSize) {
args.add("genomicsdb-vcf-buffer-size", String.valueOf(bufferSizePerSample));
}
Expand Down Expand Up @@ -1091,10 +1148,18 @@ public void testIncrementalMustHaveExistingWorkspace() {
private void testIncrementalImport(final int stepSize, final List<SimpleInterval> intervals, final String workspace,
final int batchSize, final boolean produceGTField, final boolean useVCFCodec, final String expected,
final int chrsToPartitions, final boolean useNativeReader) throws IOException {
testIncrementalImport(stepSize, intervals, workspace, batchSize, produceGTField, useVCFCodec, expected,
chrsToPartitions, useNativeReader, false);
}

private void testIncrementalImport(final int stepSize, final List<SimpleInterval> intervals, final String workspace,
final int batchSize, final boolean produceGTField, final boolean useVCFCodec, final String expected,
final int chrsToPartitions, final boolean useNativeReader, final boolean useNativeReaderInitial)
throws IOException {
for(int i=0; i<LOCAL_GVCFS.size(); i+=stepSize) {
int upper = Math.min(i+stepSize, LOCAL_GVCFS.size());
writeToGenomicsDB(LOCAL_GVCFS.subList(i, upper), intervals, workspace, batchSize, false, 0, 1, false, false, i!=0,
chrsToPartitions, i!=0 && useNativeReader);
chrsToPartitions, (i == 0 && useNativeReaderInitial) || (i > 0 && useNativeReader));
checkJSONFilesAreWritten(workspace);
}
for(SimpleInterval currInterval : intervals) {
Expand All @@ -1120,13 +1185,33 @@ public void testGenomicsDBBasicIncremental() throws IOException {
createAndCheckIntervalListFromExistingWorkspace(workspace, INTERVAL_PICARD_STYLE_EXPECTED);
}

@Test
public void testGenomicsDBBasicIncrementalAllNativeReader() throws IOException {
final String workspace = createTempDir("genomicsdb-incremental-tests").getAbsolutePath() + "/workspace";
testIncrementalImport(2, INTERVAL, workspace, 0, true, true, COMBINED_WITH_GENOTYPES, 0, true, true);
createAndCheckIntervalListFromExistingWorkspace(workspace, INTERVAL_PICARD_STYLE_EXPECTED);
}

@Test
public void testGenomicsDBIncrementalAndBatchSize1WithNonAdjacentIntervals() throws IOException {
final String workspace = createTempDir("genomicsdb-incremental-tests").getAbsolutePath() + "/workspace";
testIncrementalImport(2, MULTIPLE_NON_ADJACENT_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS, workspace, 1, false, true, "", 0, false);
createAndCheckIntervalListFromExistingWorkspace(workspace, MULTIPLE_NON_ADJACENT_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS_PICARD_STYLE_EXPECTED);
}

@Test
public void testGenomicsDBIncrementalAndBatchSize1WithNonAdjacentIntervalsNativeReader() throws IOException {
final String workspace = createTempDir("genomicsdb-incremental-tests").getAbsolutePath() + "/workspace";
testIncrementalImport(2, MULTIPLE_NON_ADJACENT_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS, workspace, 1, false, true, "", 0, true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testIncrementalImport() does not use the native reader for the first batch (i == 0) -- why is that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - not entirely sure anymore, I think I wanted to check that a given workspace could be imported to using feature reader and htslib. Refactored a bit to make that a bit more clear, and added a test that does all htslib/native incremental import

createAndCheckIntervalListFromExistingWorkspace(workspace, MULTIPLE_NON_ADJACENT_INTERVALS_THAT_WORK_WITH_COMBINE_GVCFS_PICARD_STYLE_EXPECTED);
}

@Test(expectedExceptions = {UserException.class}, expectedExceptionsMessageRegExp=".*must be block compressed.*")
public void testGenomicsDBImportNativeReaderNoCompressedVcf() throws IOException {
testGenomicsDBImporterWithGenotypes(Arrays.asList(NA_12878_PHASED), MULTIPLE_INTERVALS, NA_12878_PHASED, b37_reference_20_21,
false, true, false, true);
}

@Test
public void testGenomicsDBIncrementalAndBatchSize1WithNonAdjacentIntervalsMergeContigsIntoPartitions() throws IOException {
final String workspace = createTempDir("genomicsdb-incremental-tests").getAbsolutePath() + "/workspace";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testLoadSampleNameMapFileInSortedOrder(final String sampleMapText){
catch(URISyntaxException e) {
throw new RuntimeException("Malformed URI "+e.toString());
}
final Map<String, URI> actual = GenomicsDBImport.loadSampleNameMapFileInSortedOrder(sampleFile.toPath());
final Map<String, URI> actual = GenomicsDBImport.loadSampleNameMapFileInSortedOrder(sampleFile.toPath(), false);
Assert.assertEquals(actual, expected);
Assert.assertEquals(actual.keySet().iterator().next(), "Sample1");
}
Expand Down