Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poor performance for QC filtering on 1KG data #148

Closed
eric-czech opened this issue Jan 31, 2020 · 7 comments
Closed

Poor performance for QC filtering on 1KG data #148

eric-czech opened this issue Jan 31, 2020 · 7 comments
Assignees

Comments

@eric-czech
Copy link

Hi, I am trying to run some basic QC on a 1KG dataset with ~25M variants and 629 samples and I'm having issues making this work efficiently with Glow. These same operations take 35 seconds with PLINK, about 20 minutes with Hail, and at least 3 hours with Glow (I killed the jobs after that long so I'm not sure how long it would have ultimately taken). Is there some way I can rewrite this more efficiently?

// Load a PLINK dataset generated as a part of Marees et al. 2018 tutorial
// and write as parquet (see: https://www.ncbi.nlm.nih.gov/pubmed/29484742)
val dataFile = "ALL.2of4intersection.20100804.genotypes_no_missing_IDs.parquet" 
ss.read.format("plink")
    .option("famDelimiter", "\t")
    .option("bimDelimiter", "\t")
    .load(data_dir / data_file + ".bed" toString)
    .write.parquet(data_dir / data_file + ".parquet" toString)

val df = ss.read.parquet(data_dir / data_file + ".parquet" toString)

// QC Functions
def count(df: DataFrame) = (df.count, df.select(size(col("genotypes"))).first.getAs[Int](0)) // (n_variants, n_samples)

def filterBySampleCallRate(threshold: Double)(df: DataFrame): DataFrame = { 
    df
        // Cross join original dataset with single-row data frame containing a map like (sampleId -> QC stats)
        .crossJoin(
            df
            .selectExpr("sample_call_summary_stats(genotypes, referenceAllele, alternateAlleles) as qc")
            .selectExpr("map_from_arrays(qc.sampleId, qc) as qc")
        )
        // For each row, filter the genotypes array (which has one element per sampleId) based on QC map lookup
        .selectExpr("*", s"filter(genotypes, g -> qc[g.sampleId].callRate >= ${threshold}) as filtered_genotypes")
        // Remove intermediate fields 
        .drop("qc", "genotypes").withColumnRenamed("filtered_genotypes", "genotypes")
        // Ensure that the original dataset schema was preserved
        .transform(d => {assert(d.schema.equals(df.schema)); d})
}

def filterByVariantCallRate(threshold: Double)(df: DataFrame): DataFrame = { 
    df
        .selectExpr("*", "call_summary_stats(genotypes) as qc")
        .filter(col("qc.callRate") >= threshold)
        .drop("qc")
        .transform(d => {assert(d.schema.equals(df.schema)); d})
}

val df_qc1 = df
    .transform(filterByVariantCallRate(threshold=.8))
    .transform(filterBySampleCallRate(threshold=.8))
    .transform(filterByVariantCallRate(threshold=.98))
    .transform(filterBySampleCallRate(threshold=.98))


println(count(df_qc1))

I'm using the latest version of Glow, but I'm running on my own snapshot build with the fix for #135 (which is necessary for the above to run). I'm also using Spark 2.4.4 in local mode with a 64G heap size (in 128G, 16 core machine). I know local mode is not a typical use case, but I don't see what the problem is here since all 16 cores are at nearly 100% utilization the full 3 hours. It's also not GC pressure since only ~32G of the heap is actually being used and according to the Spark UI, the task time to GC time ratio is 4.6 h to 1.4 min for one these jobs after a couple hours.

Do you have any ideas what might be going on here? Thank you!

@karenfeng karenfeng self-assigned this Feb 5, 2020
@karenfeng
Copy link
Collaborator

karenfeng commented Feb 5, 2020

Hi @eric-czech, I observed a significant performance improvement by skipping the map_from_arrays step and transforming the genotypes array based on index (which matches that in the QC array). More specifically, I rewrote the filterBySampleCallRate function as follows:

def filterBySampleCallRate(threshold: Double)(df: DataFrame): DataFrame = {
  val qc = df.selectExpr("sample_call_summary_stats(genotypes, referenceAllele, alternateAlleles) as qc")
  df.crossJoin(qc)
    // For each row, filter the genotypes array (which has one element per sampleId) based on whether it passed the QC filter
    .selectExpr("*", s"filter(genotypes, (g, i) -> qc[i].callRate >= $threshold) as filtered_genotypes") 
    // Remove intermediate fields 
    .drop("qc", "genotypes")
    .withColumnRenamed("filtered_genotypes", "genotypes")
    // Ensure that the original dataset schema was preserved
    .transform(d => {assert(d.schema.equals(df.schema)); d})
}

On a 1-worker (r4.4xlarge) cluster, running the rewritten analysis on a DataFrame constructed directly from the PLINK files directly took 30 minutes; the original analysis took 2 hours.

There are likely more performance gains to be made here, but this was the clearest change to make for me.

@eric-czech
Copy link
Author

Hey @karenfeng thank you!

I can confirm that running a version like yours without maps is much faster, though I had to slightly modify it since my version of Spark doesn't yet have a filter overload with the (value, index) signature. For anyone else in the same situation that might happen across this issue, this was a version that worked for me:

def filterBySampleCallRate(threshold: Double)(df: DataFrame): DataFrame = {
  val qc = df.selectExpr("sample_call_summary_stats(genotypes, referenceAllele, alternateAlleles) as qc")
  df.crossJoin(qc)
    // For each row, filter the genotypes array (which has one element per sampleId) based on whether it passed the QC filter
    .selectExpr("*", s"""
        transform(
            filter(
                zip_with(sequence(0, size(genotypes)), genotypes, (i, g) -> (i, g)), e -> 
                qc[e.i].callRate >= $threshold
            ), 
            e -> e.g
        ) as filtered_genotypes
    """)
    // Remove intermediate fields 
    .drop("qc", "genotypes")
    .withColumnRenamed("filtered_genotypes", "genotypes")
    // Ensure that the original dataset schema was preserved
    .transform(d => {assert(d.schema.equals(df.schema)); d})
}

I found that this runs in about 1 hour and 25 mins on my setup (a comparable one to r4.4xlarge) when reading from parquet.

One last thought then before I close this out:

Have you all at Databricks/Regeneron thought about how to potentially propagate some bitpacked or otherwise optimized schema for these kinds of pipelines? I think that difference when reading from PLINK files directly you saw is pretty telling, so I was wondering if you had ideas on how to maintain a more efficient data representation as parquet throughout a whole pipeline.

I was also wondering if you had any plans/ideas for how to get methods like VariantQC to benefit from some form of vectorization/data parallelism. I'm finding in some benchmarks with other systems that this also makes for a huge difference in performance, so I was curious if optimizations like either of those are on the roadmap.

Thank you again for taking the time to take a look at that code!

@henrydavidge
Copy link
Contributor

@eric-czech fwiw, I think that the filter approach in Spark 3.0 will be significantly faster than the zip_with since it will avoid materializing an additional array with the genotypes and indices together. Also, I think that both runs in @karenfeng's timing comparison are from plink (please correct me if I'm wrong, Karen).

Those are great questions! I believe that the parquet spec actually will enable very efficient encoding for genotype data. Both the PLAIN_DICTIONARY and RLE encodings should work well for highly repetitive fields. I think that we just need to spend some time making sure that Glow / Spark / parquet-mr are picking the optimal encodings.

Regarding SIMD optimizations, we don't have immediate plans to do this directly in Glow, but Databricks engineering is working on some exciting projects to use these techniques to speed up many workloads including genomics. Stay tuned ;).

@eric-czech
Copy link
Author

eric-czech commented Feb 14, 2020

Thanks @henrydavidge. It looks like the integer call array field ends up as RLE encoded PLAIN_DICTIONARY values in this dataset, if I'm interpreting parquet-tools correctly. It also looks like that's the default behavior if the number of unique values is low enough, so all fields have either that or a plain encoding.

The entire parquet dataset with the Glow schema is about 5.3G while the comparable Hail MatrixTable is 2.3G (about 43% as big) so there must be a good ways to go in shrinking the serialized parquet data down.

I did notice in the row page statistics that the sampleIds are really dominating the overall space taken. In one page from one file at least, that field is accounting for ~60% of all the bytes stored while calls are 37% and everything else is marginal. I'm not sure if there's a better encoding than RLE + PLAIN_DICTIONARY for the calls, but fwiw I think getting those sample ids out of the same array as the calls would make a huge difference. Or maybe there's some way to get parquet to recognize that the sample ids are essentially the same in every row? Either way, I bet this is a big reason why the Hail/PLINK files are so much smaller.

Also, @karenfeng do you have a specific EC2 image you often use for tests like this? I'm curious why you saw that take 30 minutes while it was more like 90 for me so I was hoping to get a better understanding for what kind of configuration you were running with. Was it local mode or standalone? Maybe that's a big part of the difference given that the specs of the machines are so similar.

@hammer
Copy link

hammer commented Feb 14, 2020

Regarding SIMD optimizations, we don't have immediate plans to do this directly in Glow, but Databricks engineering is working on some exciting projects to use these techniques to speed up many workloads including genomics. Stay tuned ;).

Will this work be made available as open source?

@eric-czech, it appears that Intel is trying to make use of Gandiva to generate code that includes SIMD instructions.

@henrydavidge
Copy link
Contributor

@eric-czech I was looking over old issues and saw this. Sorry I never responded...

I looked into encodings further, and actually I don't think there's much we can do. On datasets with many samples, each row can grow large enough that we only have a few rows in each parquet row group. Dictionaries for encoding are built per row group, so if the sample ids are not repeated many times, we don't get much compression.

You're definitely right that for GWAS type use cases it's best to get the sample IDs out of the genotypes array. Most of our customers write out the sample IDs and the index separately and use that for filtering. Of course, that's less than ideal from an API perspective and we're looking at a few different options like providing functionality for moving sample ids to/from the column metadata. If you have thoughts feel free to chime in.

@henrydavidge
Copy link
Contributor

Closing this old issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants